boost::asio async_read_until may read more than the specified data!

jwatte's picture

This took me a while to track down. I figured I'd document it for posterity, and anyone else trying to do asynchronous networking in C++ using boost::asio. I'm using Ubuntu Server 10.04 LTS with gcc/g++, which uses boost version 1.40. The reason is that the "bytes transferred" argument to the callback is the number of bytes until the separator is found, but more bytes than that are decoded from the socket into the input stream.

I'm trying to parse a HTTP request using boost::asio, by first reading the headers, and then reading the body. I do this as follows:

1) Read the request line and header, by using async_read_until() passing in the delimiter CR/LF/CR/LF -- I know that each line of the header is terminated by CRLF, and an additional CRLF then introduces the body (if any).
2) Parse out the Content-Length header from the received header, and convert to integer, so I know how many bytes of body data to read.
3) Use async_read() into a pre-allocated buffer with transfer_at_least set to the number of bytes I want. The buffer is no bigger, so I won't have to worry about over-reading here.

However, as it turns out, async_read_until() will *not* dequeue only the amount of data that goes up to the terminator, but some arbitrary amount of data after that. Thus, I will have to parse out the delimited in the received stream, and then take whatever else comes from the stream as body. (I'm wondering if, under HTTP/1.1 pipelining, that may even include the start of the next request?)

Here's the code I ended up with. Search for "WORKAROUND" to find the fix I had to make.

#include <iostream>
#include <sstream>
#include <vector>
#include <error.h>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/lexical_cast.hpp>
 
boost::asio::io_service svc;
boost::asio::ip::tcp::acceptor acceptor(svc);
int gid = 0;
 
 
void acceptOne();
 
 
// Conn -- a connection from a HTTP client
// Ownership of instances are implicit through boost::bind with shared_ptr
class Conn : public boost::enable_shared_from_this<Conn>
{
public:
    Conn(boost::asio::io_service &svc) :
        id(++gid),
        socket_(svc)
    {
        std::cerr << "Connection " << id << " lives." << std::endl;
    }
    ~Conn()
    {
        std::cerr << "Connection " << id << " dies." << std::endl;
    }
    int id;
    boost::asio::ip::tcp::socket socket_;
    // headers go into buf_
    boost::asio::streambuf buf_;
    // body goes into data_
    std::vector<char> data_;
    // response goes into reply_ while being written
    std::string reply_;
 
    // the response has completed
    void on_written(boost::system::error_code const &err, size_t xfer)
    {
        if (!!err)
        {
            std::cerr << "write error " << err << std::endl;
        }
        else
        {
            std::cerr << "write success " << xfer << " bytes" << std::endl;
        }
        //  fall off the end will deallocate the conn
    }
 
    // report the result to the user (success or failure; body)
    void reportResult(int code, std::string const &data)
    {
        std::cerr << "connection " << id << " responds " << code << " " << data << std::endl;
        std::stringstream ss;
        ss << "HTTP/1.1 " << code << " Complete\r\n";
        ss << "Connection: close\r\n";
        ss << "Content-Type: text/plain\r\n";
        ss << "Content-Length: " << data.size() << "\r\n";
        ss << "\r\n";
        ss << data;
        reply_ = ss.str();
        // send on the socket; when done, call on_written()
        boost::asio::async_write(socket_, boost::asio::buffer(&reply_[0], reply_.size()),
            boost::asio::transfer_all(), boost::bind(&Conn::on_written, shared_from_this(),
                boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
    }
 
    // I have read the data of the body (based on Content-Length)
    void on_readData(boost::system::error_code const &err, size_t xfer)
    {
        if (!!err)
        {
            std::cerr << "error reading data: " << err << std::endl;
            svc.stop();
            return;
        }
        std::cerr << "data read " << xfer << " bytes." << std::endl;
        reportResult(200, "All is well.");
    }
 
    // I have read the header (terminated with \r\n\r\n)
    void on_readHeader(boost::system::error_code const &err, size_t xfer)
    {
        if (!!err)
        {
            std::cerr << "error reading header: " << err << std::endl;
            svc.stop();
            return;
        }
        std::cerr << "header read " << xfer << " bytes." << std::endl;
        std::ostringstream ss;
        ss << &buf_;
        std::string data(ss.str());
        size_t hdrEnd = data.find("\r\n\r\n");
        //  I have read the header -- now look for the content length
        //  Note: this doesn't work with chunked content transfer encoding
        size_t pos = data.find("Content-Length: ");
        if (pos == std::string::npos)
        {
            // a request without body is not what I want right now
            std::cerr << "header does not contain Content-Length." << std::endl;
            reportResult(400, "expected Content-Length");
            return;
        }
        size_t end = data.find_first_of("\r\n", pos);
        std::string clenstr(data.substr(pos+16, end-pos-16));
        size_t clen = boost::lexical_cast<size_t>(clenstr);
        data_.resize(clen);
 
        //  WORKAROUND: async_read_until may actually read more than up to 
        //  the requested terminator. Copy whatever we got into the buffer.
        size_t snippetEnd = data.size() - hdrEnd;
        if (snippetEnd - hdrEnd > clen)
        {
            snippetEnd = hdrEnd + clen;
        }
        if (snippetEnd > hdrEnd)
        {
            std::copy(data.begin() + hdrEnd, data.begin() + snippetEnd, data_.begin());
        }
        if ((snippetEnd - hdrEnd) == clen)
        {
            //  I already got everything -- call through to success
            on_readData(boost::system::errc::make_error_code(boost::system::errc::success), clen);
        }
        else
        {
            //  END WORKAROUND: read whatever else was missing
            std::cerr << "reading " << clen << " bytes of body." << std::endl;
            // read whatever is needed of the body, call on_readData() when done
            boost::asio::async_read(socket_,
                boost::asio::buffer(&data_[0 + (snippetEnd - hdrEnd)], clen - (snippetEnd - hdrEnd)), 
                boost::asio::transfer_at_least(clen),
                boost::bind(&Conn::on_readData, shared_from_this(),
                    boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
        }
    }
 
    // The acceptor accepted a connection on my socket
    void on_accept(boost::system::error_code const &err)
    {
        if (!!err)
        {
            std::cerr << "error accepting connection: " << err << " " << strerror(err.value()) << std::endl;
            svc.stop();
            return;
        }
        // start a new accept operation for the next client
        acceptOne();
        // read the header, terminated by \r\n\r\n, and call on_readHeader() when done
        boost::asio::async_read_until(socket_, buf_, "\r\n\r\n",
            boost::bind(&Conn::on_readHeader, shared_from_this(), 
                boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
    }
};
 
// Start accepting a new incoming connection from a client
void acceptOne()
{
    // the Conn object is pre-allocated for the next client.
    // lifetime is implicit using shared_ptr (as long as there's a boost::bind() referencing it)
    boost::shared_ptr<Conn> conn(new Conn(svc));
    acceptor.async_accept(conn->socket_,
        boost::bind(&Conn::on_accept, conn, boost::asio::placeholders::error));
}
 
int main()
{
    // set up for listening to connections on port 8080
    acceptor.open(boost::asio::ip::tcp::v4());
    acceptor.bind(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 8080));
    acceptor.listen();
    acceptOne();
 
    // someone calling svc.stop() will terminate this
    std::cerr << "calling svc.run()" << std::endl;
    svc.run();
    std::cerr << "main() done." << std::endl;
    return 0;
}

You can test this from the command line:

curl -d foo=bar localhost:8080