Работа с boost::asio::streambuf

В поисках boost::asio (и вместе с ним boost) решил написать асинхронный сервер. Для хранения входящих данных я использую boost::asio::streambuf. Вот у меня проблема. Когда я получаю второе сообщение от клиента и последующие, я вижу, что в буфере есть данные из предыдущих сообщений. Хотя я вызываю метод Consume во входном буфере. Что случилось?

class tcp_connection
// Using shared_ptr and enable_shared_from_this 
// because we want to keep the tcp_connection object alive 
// as long as there is an operation that refers to it.
: public boost::enable_shared_from_this<tcp_connection>
{
...

boost::asio::streambuf receive_buffer;

boost::asio::io_service::strand strand;
}

...

void tcp_connection::receive()
{
// Read the response status line. The response_ streambuf will
// automatically grow to accommodate the entire line. The growth may be
// limited by passing a maximum size to the streambuf constructor.
boost::asio::async_read_until(m_socket, receive_buffer, "\r\n",
    strand.wrap(boost::bind(&tcp_connection::handle_receive, shared_from_this()/*this*/,
    boost::asio::placeholders::error,
    boost::asio::placeholders::bytes_transferred)));

}


void tcp_connection::handle_receive(const boost::system::error_code& error, 
std::size_t bytes_transferred)
{

if (!error)
{
    // process the data

    /*  boost::asio::async_read_until remarks

    After a successful async_read_until operation, 
    the streambuf may contain additional data beyond the delimiter.
    An application will typically leave that data in the streambuf for a
    subsequent async_read_until operation to examine.
    */

    /* didn't work      
    std::istream is(&receive_buffer);
    std::string line;
    std::getline(is, line); 
    */


    // clean up incomming buffer but it didn't work 
    receive_buffer.consume(bytes_transferred);  

    receive(); 

}
else if (error != boost::asio::error::operation_aborted)
{
    std::cout << "Client Disconnected\n";

    m_connection_manager.remove(shared_from_this());
}
}

person vint    schedule 12.02.2015    source источник


Ответы (1)


Либо используя std::istream и читая из него, например, std::getline() или явный вызов boost::asio::streambuf::consume(n) удалит данные из входной последовательности.
Если приложение выполняет какую-либо из этих и последующие read_until() операции, приводит к дублированию данных во входной последовательности receive_buffer, тогда дублированные данные, вероятно, исходят от удаленного партнера. Если удаленный узел выполняет запись в сокет и напрямую использует входную последовательность streambuf, то удаленный узел должен явно вызывать consume() после каждой успешной операции записи.


Как указано в документации, успешные операции read_until() могут содержат дополнительные данные за пределами разделителя, включая дополнительные разделители. Например, если "a@b@" записывается в сокет, операция read_until(), использующая '@' в качестве разделителя, может прочитать и зафиксировать "a@b@" во входной последовательности streambuf. Однако операция укажет, что количество переданных байтов соответствует первому разделителю включительно. Таким образом, bytes_transferred будет 2, а streambuf.size() будет 4. После того, как будет использовано 2 байт, входная последовательность streambuf будет содержать "b@", а последующий вызов read_until() немедленно вернет результат, так как streambuf уже содержит разделитель.

Вот полный пример, демонстрирующий использование streambuf для чтения и записи и использование входной последовательности:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>

// This example is not interested in the handlers, so provide a noop function
// that will be passed to bind to meet the handler concept requirements.
void noop() {}

std::string make_string(boost::asio::streambuf& streambuf)
{
 return {buffers_begin(streambuf.data()), 
         buffers_end(streambuf.data())};
}

int main()
{
  using boost::asio::ip::tcp;
  boost::asio::io_service io_service;

  // Create all I/O objects.
  tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
  tcp::socket server_socket(io_service);
  tcp::socket client_socket(io_service);

  // Connect client and server sockets.
  acceptor.async_accept(server_socket, boost::bind(&noop));
  client_socket.async_connect(acceptor.local_endpoint(), boost::bind(&noop));
  io_service.run();

  // Write to server.
  boost::asio::streambuf write_buffer;
  std::ostream output(&write_buffer);
  output << "a@"
            "b@";
  write(server_socket, write_buffer.data());
  std::cout << "Wrote: " << make_string(write_buffer) << std::endl;
  assert(write_buffer.size() == 4);  // Data not consumed.

  // Read from the client.
  boost::asio::streambuf read_buffer;

  // Demonstrate consuming via istream.
  {
    std::cout << "Read" << std::endl;
    auto bytes_transferred = read_until(client_socket, read_buffer, '@');
    // Verify that the entire write_buffer (data pass the first delimiter) was
    // read into read_buffer.
    auto initial_size = read_buffer.size();
    assert(initial_size == write_buffer.size());

    // Read from the streambuf.
    std::cout << "Read buffer contains: " << make_string(read_buffer)
              << std::endl;
    std::istream input(&read_buffer);
    std::string line;
    getline(input, line, '@'); // Consumes from the streambuf.
    assert("a" == line); // Note getline discards delimiter.
    std::cout << "Read consumed: " << line << "@" << std::endl;
    assert(read_buffer.size() == initial_size - bytes_transferred);
  }

  // Write an additional message to the server, but only consume 'a@'
  // from write buffer.  The buffer will contain 'b@c@'.
  write_buffer.consume(2);
  std::cout << "Consumed write buffer, it now contains: " <<
                make_string(write_buffer) << std::endl;
  assert(write_buffer.size() == 2);
  output << "c@";
  assert(write_buffer.size() == 4);
  write(server_socket, write_buffer.data());
  std::cout << "Wrote: " << make_string(write_buffer) << std::endl;

  // Demonstrate explicitly consuming via the streambuf.
  {
    std::cout << "Read" << std::endl;
    auto initial_size = read_buffer.size();
    auto bytes_transferred = read_until(client_socket, read_buffer, '@');
    // Verify that the read operation did not attempt to read data from
    // the socket, as the streambuf already contained the delimiter.
    assert(initial_size == read_buffer.size());

    // Read from the streambuf.
    std::cout << "Read buffer contains: " << make_string(read_buffer)
              << std::endl;
    std::string line(
        boost::asio::buffers_begin(read_buffer.data()),
        boost::asio::buffers_begin(read_buffer.data()) + bytes_transferred);
    assert("b@" == line);
    assert(read_buffer.size() == initial_size); // Nothing consumed.
    read_buffer.consume(bytes_transferred); // Explicitly consume.
    std::cout << "Read consumed: " << line << std::endl;
    assert(read_buffer.size() == 0);
  }

  // Read again.
  {
    std::cout << "Read" << std::endl;
    read_until(client_socket, read_buffer, '@');

    // Read from the streambuf.
    std::cout << "Read buffer contains: " << make_string(read_buffer)
              << std::endl;
    std::istream input(&read_buffer);
    std::string line;
    getline(input, line, '@'); // Consumes from the streambuf.
    assert("b" == line); // Note "b" is expected and not "c".
    std::cout << "Read consumed: " << line << "@" << std::endl;
    std::cout << "Read buffer contains: " << make_string(read_buffer)
              << std::endl;
  }
}

Вывод:

Wrote: a@b@
Read
Read buffer contains: a@b@
Read consumed: a@
Consumed write buffer, it now contains: b@
Wrote: b@c@
Read
Read buffer contains: b@
Read consumed: b@
Read
Read buffer contains: b@c@
Read consumed: b@
Read buffer contains: c@
person Tanner Sansbury    schedule 13.02.2015
comment
Спасибо, это очень хороший пример работы со streambuf. Нашел почему после отправки вижу данные из предыдущих сообщений. На клиенте функция Send() использует глобальный буфер streambuf (send_buffer в моем примере). Как мне сделать, чтобы при каждом вызове Send() использовался собственный объект streambuf? Что-то вроде boost::make_shared? Это правильный подход? У вас есть хороший пример сервера и клиента, использующих boost:: asio? Вот что в документации Boost меня не устраивает, там используются буферы постоянной длины, и хотелось бы увидеть пример использования streambuf. - person vint; 16.02.2015
comment
Круто, очень полезный ответ. Я бы хотел, чтобы люди из Boost заставили вас написать всю их документацию! ;-) - person Dronz; 23.02.2015
comment
Как вы получили доступ к входной последовательности, даже не зафиксировав (commit())? документация по данным() говорит, что data() возвращает список входных буферов, и согласно документация по фиксации() фактически копирует символы из выходной последовательности во входную последовательность. - person Cengiz Kandemir; 27.07.2016
comment
@Cengiz При использовании операций Asio, которые принимают streambuf (а не буфер, представляющий одну из его последовательностей), или при использовании объектов потока, которые используют streambuf, например std::ostream, базовые входные и выходные последовательности будут правильно управляться. Этот ответ может содержать более подробную информацию. Кроме того, в документации для commit() упоминается перемещение, а не копирование данных. Текущая реализация использует один непрерывный массив, где commit() продвигает как указатель начала выходной последовательности, так и указатель конца входной последовательности на n символов. - person Tanner Sansbury; 27.07.2016
comment
@Tanner, спасибо за быстрый ответ. Поскольку вопрос немного устарел, я ожидал немного подождать. - person Cengiz Kandemir; 27.07.2016
comment
@Tanner Если ты все еще рядом. См.: stackoverflow.com/questions/46980927/ - person Christopher Pisz; 27.10.2017