Асинхронные режимы фреймворка gRPC и принципы их работы в С++

Однажды мы с нашей командой решили попробовать gRPC для своих задач. После некоторых обсуждений, пришли к выводу, что будем использовать асинхронные клиент и сервер. Однако, под рукой оказался рабочий пример из документации только для одного режима. Примеры остальных режимов взаимодействия, основные принципы работы асинхронных операций, принципы работы асинхронного сервера и клиента в gRPC и многое другое под катом.

Введение


Цель данной статьи — объяснение принципов работы асинхронных операций фреймворка gRPC в C++ для всех четырех режимов взаимодействия. Те читатели, которые хотят использовать асинхронные режимы gRPC, но не до конца понимают как они реализуются на сервере и/или клиенте, смогут найти объяснение ниже. Читатели, которые понимают принципы работы асинхронного сервера и клиента, но не имеют под рукой рабочих примеров всех четырех режимов взаимодействия, также найдут примеры в статье. Читателей, которые все поняли и получили рабочие примеры — прошу в комментарии.

План статьи следующий:

  1. Отличия многопоточной и асинхронной моделей приложений;
  2. Асинхронная модель приложений применительно к gRPC;
  3. Используемая терминология;
  4. Исходные данные;
  5. Примеры асинхронных режимов взаимодействия;
  6. Заключение и список использованных источников.

1. Отличия многопоточной и асинхронной моделей приложений


Рассмотрим, в чем разница многопоточной и асинхронной архитектур приложений.
Когда к серверу подключаются более одного клиента, вариантов для обработки подключений здесь ровно два. Первый вариант — это создание отдельного потока на каждое подключение, т.е. обслуживание каждого клиента в собственном потоке. Второй вариант — это использование асинхронной модели приложения. Асинхронная модель приложения неразрывно связана с понятием очереди событий (event loop). Под событием понимается некоторая логически завершенная операция на уровне ядра: на сокете появилось новое подключение, от клиента получено новое сообщение, сообщение отправлено клиенту и т.д. У.Р. Стивенс в своей книге «Unix: Разработка сетевых приложений» про асинхронную модель ввода-вывода пишет следующее:
Мы сообщаем ядру, что нужно начать операцию и уведомить нас о том, когда вся операция (включая копирование данных из ядра в наш буфер) завершится.
Рассмотрим простой пример сервера, который обрабатывает подключение клиента в три этапа. Первый этап — обработка подлючения. На этом этапе не происходит операций с файловой системой, вводом-выводом и БД. Второй этап — синхронное чтение данных с сокета. И третий этап — обработка полученных данных. Условия такие же, как на первом этапе. Для простоты будем считать, что первый и третий этапы занимают 1 мс, второй этап занимает 2 мс.
Теперь допустим, что к серверу подключилось несколько клиентов.

При многопоточной модели, на каждое подключение создается отдельный поток. Каждый поток тратит 4 мс на обработку запроса клиента. Однако, при чтении данных с сокета, поток начинает ждать завершения операции. Пока данные с сокета не считаются в буфер приложения, поток будет заблокирован. Получается, что 2мс из общих 4 мс — 50% времени работы потока — процессор будет простаивать.

Для решения проблемы простоя процессора, придумали асинхронные операции ввода-вывода и сопутствующие этим операциям асинхронные модели приложений.

Асинхронная модель приложения основывается на очереди событий. При возникновении какого-либо события, это событие помещается в конец очереди. Пока в очереди есть события, процессор будет всегда занят работой. Операции ввода-вывода, связанные с событиями в очереди, должны быть асинхронными.

В примере с трехэтапным сервером нужно заменить синхронное чтение данных с сокета на асинхронное. Конечно, до тех пор пока данные с сокета не считаются, мы не сможем перейти к третьему этапу. Тем не менее, после помещения события окончания чтения данных с сокета в конец очереди, можно извлекать события из очереди и выполнять связанные с ними операции. Например, за те 2 мс, пока происходит считывание данных с сокета первого клиента, можно выполнить обработку подключения и запрос на чтения данных с сокета для второго и третьего клиентов. После этого из очереди поступит событие завершения чтения данных с сокета для первого клиента.

В целом, в асинхронной модели каждый запрос клиента может выполняться медленнее, чем в многопоточной модели. Однако, общая производительность при использовании асинхронной модели оказывается выше за счет уменьшения времени простоя процессора.

В этом месте нужно обратить внимание на несколько моментов. Первый момент касается того, что все операции в асинхронной модели выполняются в одном потоке. В отличии от многопоточной модели. В связи с этим возникает второй момент: если в очереди событий находятся ресурсоемкие операции, то во время их выполнения остальные операции будут простаивать в очереди. Есть два варианта решения этой проблемы. Первый вариант — создание отдельных потоков на ресурсоемкие операции из очереди. Второй вариант — разбиение ресурсоемких операции на несколько частей, которые будут вызываться из очереди.

2. Асинхронная модель приложений применительно к gRPC


При использовании синхронных RPC на сервере, в gRPC используется многопоточная модель приложения. Каждый клиентский запрос обрабатывается в отдельном потоке.

При использовании асинхронных RPC, в gRPC используется асинхронная модель приложения как на сервере, так и на клиенте. Согласно принципу асинхронной модели, существует очередь событий. В gRPC очередь событий реализована в классе grpc::CompletionQueue. Добавление событий в очередь происходит внутри gRPC, а запросы на добавление и считывание событий из очереди — в пользовательском приложении.

В терминологии gRPC респондер (responder) — это «средство ответа клиенту» на сервере, и «средство приема сообщений от сервера» на клиенте.

Запрос на добавление события в очередь событий происходит при вызове любой функции из интерфейса респондера. Под событием, как и ранее, понимается логически завершенная операция. Например, событие подключения клиента к серверу означает, что клиент уже подключился к серверу; событие приема сообщения от клиента означает, что сообщение уже находится в буфере приложения и т.д.

При запросе добавления события в очередь, каждому событию присваивается тэг. Это сделано для того, чтобы можно было отличать события, считанные из очереди. Строго говоря, физически из очереди считывается не событие, а ассоциированный с этим событием тэг. Например, пусть сервер делает запрос на добавление события приема сообщения от клиента и присваивает этому событию тэг = 1. Когда из очереди вернется тэг = 1, это будет означать, что произошло событие приема сообщение от клиента, т.е. сообщение от клиента считано в указанный буфер.

3. Используемая терминология


В gRPC существует четыре вида методов, которые могут быть определены в сервисе.

  • Unary RPCs. Клиент делает запрос на сервер в виде одного сообщения и получает ответ в виде одного сообщения от сервера. Далее мы будем называть эти методы ONE-ONE, а классы, реализующие эти методы без суффиксов.
  • Server streaming RPCs. Клиент делает запрос на сервер в виде одного сообщения и получает ответ в виде последовательности сообщений от сервера. Далее мы будем называть эти методы ONE-MANY, а классы, реализующие эти методы — с суффиксом «1M».
  • Сlient streaming RPCs. Клиент делает запрос на сервер в виде последовательности сообщений и получает ответ в виде одного сообщения от сервера. Далее мы будем называть эти методы MANY-ONE, а классы, реализующие эти методы — с суффиксом «M1».
  • Bidirectional streaming RPCs. Клиент делает запрос на сервер в виде последовательности сообщений и получает ответ в виде последовательности сообщений от сервера. Далее мы будем называть эти методы MANY-MANY, а классы, реализующие эти методы — с суффиксом «MM».

На уровне терминологии договоримся, что фраза “после следующего получения тэга из очереди” будет опускаться. Для асинхронных операций gRPC существует правило: одна операция респондера (включая создание и удаление) за одно получение тэга из очереди. Выше было сказано, что при вызове функций респондера делается запрос на добавление события в очередь событий. Поэтому в терминологии асинхронной модели правило будет звучать так: один запрос на добавление события в очередь за одно считывание события из очереди.

Это означает, что будет правильно: дождаться получение тэга из очереди — вызвать функцию А респондера — дождаться получение тэга из очереди — вызвать функцию B респондера — дождаться получение тэга из очереди — удалить объект респондера. И будет неправильно любая из комбинаций, где между вызовами функций нет “получения тэга из очереди”, например: дождаться получение тэга из очереди — вызвать функцию A респондера — вызвать функцию B респондера — дождаться получение тэга из очереди — удалить объект. Поэтому фраза “после следующего получения тэга из очереди” при дальнейшем изложении будет опущена.
На уровне терминологии будем употреблять «виды методов» и «режимы взаимодействия» как синонимы.

4. Исходные данные


Файл helloworld.proto


В качестве исходных данных возьмем сервис Greeter и дополним его тремя методами на каждый вид взаимодействия (файл helloworld.proto):

  syntax = "proto3";
  package helloworld;
  // The greeting service definition.
  service Greeter{
    rpc SayHello (HelloRequest) returns (HelloReply) {}
    rpc GladToSeeMe(HelloRequest) returns (stream HelloReply){}
    rpc GladToSeeYou(stream HelloRequest) returns (HelloReply){}
    rpc BothGladToSee(stream HelloRequest) returns (stream HelloReply){}
  }
  // The request message containing the user's name.
  message HelloRequest {
    string name = 1;
  }
  // The response message containing the greetings
  message HelloReply {
    string message = 1;
  }

Метод GladToSeeMe реализует режим ONE-MANY. Метод GladToSeeYou реализует режим MANY-ONE. Метод BothGladToSee реализует режим MANY-MANY.

Базовый класс на сервере


Единственное, что будет отличаться во всех четырех видах методов — это режим взаимодействия с клиентом. За режим взаимодействия отвечает респондер соответствующего типа. Все остальные параметры (сервис, очередь сообщений, контекст и т.д.) в данном случае являются общими для всех четырех видов методов. Поэтому общие для всех четырех режимов данные помещаются в базовый абстрактный класс, от которого классы соответствующих режимов будут наследоваться:

class CommonCallData
{
public:
  Greeter::AsyncService* service_;
  ServerCompletionQueue* cq_;
  ServerContext ctx_;
  HelloRequest request_;
  HelloReply reply_;
  enum CallStatus { CREATE, PROCESS, FINISH };
  CallStatus status_; 
  std::string prefix;
public:
  explicit CommonCallData(Greeter::AsyncService* service, ServerCompletionQueue* cq):
  service_(service), cq_(cq),status_(CREATE),prefix("Hello ")
  {}
  virtual ~CommonCallData(){}
  virtual void Proceed(bool = true) = 0;
};

Абстрактный класс CommonCallData содержит общие для каждого из режима данные (часть из которых инициализируется в конструкторе), виртуальный деструктор и чисто виртуальную функцию Proceed().

Базовый класс на клиенте


Исключительно для удобства чтения кода, классы для каждого вида взаимодействия на клиенте будут выглядеть аналогично соответствующим классам на сервере. Каждый из классов взамодействия наследуется от базового абстрактного класса CommonAsyncClientCall:

class CommonAsyncClientCall
{
public:
  ClientContext context;
  HelloReply reply;
  enum CallStatus { CREATE, PROCESS, FINISH };
  CallStatus callStatus ;
  Status status;
  void printReply(const char* from)
  {
    if(!reply.message().empty())
      std::cout << "[" << from << "]: reply message = " << reply.message() << std::endl;
    else
      std::cout << "[" << from << "]: reply message empty" << std::endl;
  }
  explicit CommonAsyncClientCall():callStatus(CREATE){}
  virtual ~CommonAsyncClientCall(){}
  virtual void Proceed(bool = true) = 0;
};

Класс CommonAsyncClientCall содержит общие для всех видов взаимодействия данные, конструктор, виртуальных деструктор, чисто виртуальную функцию Proceed() и функцию вывода ответа от сервера PrintReply().

5. Примеры асинхронных режимов взаимодействия


Режим взаимодействия один к одному


Код на сервере


Реализация класса CallData для режима взаимодействия один к одному будет выглядеть следующим образом:

class CallData: public CommonCallData
{
  ServerAsyncResponseWriter<HelloReply> responder_;
public:
  CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq):
  CommonCallData(service, cq), responder_(&ctx_){Proceed();}
  virtual void Proceed(bool = true) override
  {
    if (status_ == CREATE)
    {
      std::cout << "[Proceed11]: New responder for 1-1 mode" << std::endl;
      status_ = PROCESS;
      service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, this);
    }
    else if (status_ == PROCESS)
    {
      new CallData(service_, cq_);
      std::cout << "[Proceed11]: request message = " << request_.name() << std::endl;
      reply_.set_message(prefix + request_.name());
      status_ = FINISH;
      responder_.Finish(reply_, Status::OK, this);
    }
    else
    {
      GPR_ASSERT(status_ == FINISH);
      std::cout << "[Proceed11]: Good Bye" << std::endl;
      delete this;
    }
  }
};

Класс CallData наследуется от базового класса и реализует функцию Proceed(). Из членов класса в классе CallData присутствует только респондер соответствующего типа (ServerAsyncResponseWriter<HelloReply>). Аргументы конструктора CallData передаются в конструктор базового класса CommonCallData. Так как все четыре класса соответствующих режимов будут иметь свою реализацию Proceed(), указатель void* tag нужно теперь приводить не к типу CallData, а к типу CommonCallData:

while(true)
{
  GPR_ASSERT(cq_->Next(&tag, &ok));
  //GPR_ASSERT(ok);
  static_cast<CommonCallData*>(tag)->Proceed(ok);
}

Заметьте, что теперь проверка GPR_ASSERT(ok) закомментирована. Об этом будет рассказано ниже.

Код на клиенте


Реализация класса AsyncClientCall для режима взаимодействия один к одному будет выглядеть следующим образом:

class AsyncClientCall: public CommonAsyncClientCall
{
  std::unique_ptr< ClientAsyncResponseReader<HelloReply> > responder;
public:
  AsyncClientCall(const HelloRequest& request, CompletionQueue& cq_, std::unique_ptr<Greeter::Stub>& stub_):CommonAsyncClientCall()
  {
    std::cout << "[Proceed11]: new client 1-1" << std::endl;
    responder = stub_->AsyncSayHello(&context, request, &cq_);
    responder->Finish(&reply, &status, (void*)this);
    //callStatus = PROCESS ;
  }
  virtual void Proceed(bool ok = true) override
  {
    GPR_ASSERT(ok);
    if(status.ok())
      printReply("Proceed11");
    std::cout << "[Proceed11]: Good Bye" << std::endl;
    delete this;
  }
};

Класс AsyncClientCall наследуется от класса CommonAsyncClientCall, точно также, как в случае с сервером, имеет соответствующий режиму взаимодействию респондер и реализует чисто виртуальную функцию Proceed(). Код функции Proceed() совпадает с кодом, приведенным выше для соответствующего режима, который выполнялся в функции GreeterClient::AsyncCompleteRpc().

Код, который раньше находился в функции GreeterClient::SayHello() теперь находится в конструкторе класса AsyncClientCall. С точки зрения логики, такая реализация не является правильной и сделана для обеспечения сходства с реализацией сервера. Далее все классы на клиенте будут реализованы по такому же принципу. При желании всегда можно структурировать код так, как это сделано в документации.

Так как мы перенесли код из функции GreeterClient::SayHello() в конструктор класса AsyncClientCall, код фунции GreeterClient::SayHello() будет выглядеть так:

void SayHello(const std::string& user)
{
  HelloRequest request;
  request.set_name(user);
  new AsyncClientCall(request, cq_, stub_);
}

Код функции GreeterClient::AsyncCompleteRpc() теперь выглядит так:
void AsyncCompleteRpc()
{
  void* got_tag;
  bool ok = false;
  while(cq_.Next(&got_tag, &ok))
  {
    static_cast<CommonAsyncClientCall*>(got_tag)->Proceed(ok);
  }
  std::cout << "Completion queue is shutting down." << std::endl;
}

Здесь становится видно очевидное сходство с кодом на сервере. Точно также из очереди возвращается тэг, поставленный в соответствие новому ответу от сервера. Точно также тэг приводится к указателю на базовый класс CommonAsyncClientCall, точно также вызывается соответствующая режиму реализация функции Proceed().

Консольный вывод сервера и клиента в режиме один к одному


Консольный вывод сервера для режима один к одному выглядит следующим образом:

[Proceed11]: New responder for 1-1 mode
[Proceed11]: New responder for 1-1 mode
[Proceed11]: request message = world
[Proceed11]: Good Bye

Сначала создается первый респондер. После того, как получен запрос от клиента, создается второй респондер, который будет обрабатывать следующие запросы. Запрос от клиента содержит сообщение “world”. Делается запрос на отправление клиенту сообщения функцией респондера Finish(). При следующем получении объекта в состоянии FINISH из очереди, объект удаляется.

Консольный вывод клиента для режима один к одному выглядит следующим образом:

[Proceed11]: new client 1-1
[Proceed11]: reply message = Hello world
[Proceed11]: Good Bye

Сначала создается новый клиент. Когда сообщение от сервера получено, оно выводится на экран. После этого объект удаляется.

Режим взаимодействия один ко многим


Код на сервере


Класс CallData1M, отвечающий за режим взаимодействия один ко многим на сервере выглядит следующим образом:

class CallData1M: public CommonCallData
{
  ServerAsyncWriter<HelloReply> responder_;unsigned mcounter;
  bool new_responder_created;
  public:
  CallData1M(Greeter::AsyncService* service, ServerCompletionQueue* cq):
  CommonCallData(service, cq), responder_(&ctx_), mcounter, new_responder_created(false){ Proceed() ;}
  virtual void Proceed(bool = true) override
  {
    if(status_ == CREATE)
    {
      std::cout << "[Proceed1M]: New responder for 1-M mode" << std::endl;
      service_->RequestGladToSeeMe(&ctx_, &request_, &responder_, cq_, cq_, this);
      status_ = PROCESS ;
    }
    else if(status_ == PROCESS)
    {
      if(!new_responder_created)
      {
        new CallData1M(service_, cq_);
        new_responder_created = true ;
        std::cout << "[Proceed1M]: request message = " << request_.name() << std::endl;
      }
      static std::vector<std::string> greeting = {std::string(prefix + request_.name() + "!"),
                            "I'm very glad to see you!",
                            "Haven't seen you for thousand years.",
                            "I'm server now. Call me later."};
      if(mcounter >= greeting.size())
      {
        std::cout << "[Proceed1M]: Trying finish" << std::endl;
        status_ = FINISH;
        responder_.Finish(Status(), (void*)this);
      }
      else
      {
        reply_.set_message(greeting.at(mcounter));
        std::cout << "[Proceed1M]: Writing" << std::endl;
        responder_.Write(reply_, (void*)this);
        ++mcounter;
      }
    }
    else // if(status_ == FINISH)
    {
      std::cout << "[Proceed1M]: Good Bye" << std::endl;
      delete this;
    }
  }
};

Класс CallData1M наследуется от базового класса CommonCallData, содержит три члена класса:

  • респондер ServerAsyncWriter<HelloReply>;
  • счетчик отправленных сообщений unsigned mcounter;
  • флаг того, что новый респондер был создан bool new_responder_created

Все члены класса инициализируются в списке инициализации конструктора, после чего вызывается реализация функции Proceed(). Изначально все респондеры имеют состояние CREATE, поэтому мы попадаем в условие if(status_ == CREATE), в котором в качестве тэга на событие подключения нового клиента указываем текущего респондера. После этого респондер переходит в состояние PROCESS. Когда очередной запрос клиента будет получен, из очереди вернется тэг, ассоциированный с этим событием. Тэг, полученный из очереди, инициализирован указателем на объект респондера с состоянием PROCESS. Поэтому мы попадем в условие else if(status_ == PROCESS). При первом попадании в это условие будет создан новый объект респондера для обработки следующих запросов клиентов. После этого создается статический вектор с ответами сервера. Изначально переменная mcounter равна нулю, поэтому мы попадаем в условие else. В условии else в текст ответа устанавливается сообщение из вектора ответов под номером mcounter, после чего ответ отправляется клиенту и переменная mcounter инкрементируется.

Отправка сообщений клиенту будет происходить ровно (greeting.size()-1) раз. Мы не можем отправить все ответы за один раз, потому что это противоречит принципам асинхронной модели, на основе которых реализованы асинхронные режимы в gRPC. Из документации:
Only one write may be outstanding at any given time. This means that after calling Write, one must wait to receive tag from the completion queue BEFORE calling Write again.
что означает примерно следующее: Только одна запись может быть выполнена за один раз, т.е. после вызова функции Write нужно дождаться получение тэга из очереди ПЕРЕД тем как вызвать функцию Write снова. Именно поэтому объекты класса CallData1M хранят информацию о количестве отправленных сообщений.

После того, как все сообщения из вектора будут отправлены, мы попадем в условие if(mcounter >= greeting.size()). В этом условии состояние респондера меняется на FINISH, после чего вызывается функция Finish().

В следующий раз в функции Proceed() мы попадем в условие else if(status_ == FINISH), в котором объект будет удален.

Чтобы создать самый первый объект класса CallData1M, нужно в функцию Run() добавить строчку:

new CallData1M(&service_, cq_.get());

Код на клиенте


Класс AsyncCallData1M, отвечающий за режим взаимодействия один ко многим на клиенте выглядит следующим образом:

class AsyncClientCall1M : public CommonAsyncClientCall
{
  std::unique_ptr< ClientAsyncReader<HelloReply> > responder;
public:
  AsyncClientCall1M(const HelloRequest& request, CompletionQueue& cq_, std::unique_ptr<Greeter::Stub>& stub_)
  :CommonAsyncClientCall()
  {
    std::cout << "[Proceed1M]: new client 1-M" << std::endl;
    responder = stub_->AsyncGladToSeeMe(&context, request, &cq_, (void*)this);
    callStatus = PROCESS ;
  }
  virtual void Proceed(bool ok = true) override
  {
    if(callStatus == PROCESS)
    {
      if(!ok)
      {
        std::cout << "[Proceed1M]: Trying finish" << std::endl;
        responder->Finish(&status, (void*)this);
        callStatus = FINISH;
        return ;
      }
      responder->Read(&reply, (void*)this);
      printReply("Proceed1M");
    }
    else if(callStatus == FINISH)
    {
      std::cout << "[Proceed1M]: Good Bye" << std::endl;
      delete this;
    }
    return ;
  }
};

Класс AsyncClientCall1M наследуется от CommonAsyncClientCall и содержит респондер соответствующего типа. В конструкторе класса делается запрос к серверу и респондер переходит в состояние PROCESS.

Когда объект этого класса вернется из очереди, его состояние будет PROCESS. В функцию Proceed() передается флаг ok, полученный из очереди, ассоциированный с тэгом. Значение флага ok равное false говорит о том, что сообщений от сервера больше не будет, т.е. на сервере была вызвана функция Finish(). О том, когда на стороне клиента нужно вызывать функцию Finish() говорится в документации:
It is appropriate to call this method when [...] there are no more messages to be received from the server (this can be known implicitly by the calling code, or explicitly from an earlier call to AsyncReaderInterface::Read that yielded a failed result, e.g. cq->Next(&read_tag, &ok) filled in 'ok' with 'false').
что означает примерно следующее: нужно вызывать этот метод [Finish()] когда больше не будет сообщений от сервера (это можно узнать неявно из кода либо явно, когда запрос на чтение (AsyncReaderInterface::Read) вернет неудачный результат, т.е. ok, вернувшийся из очереди cq->Next(&read_tag, &ok) будет равен false).

Если ok равен true, то мы делаем очередной запрос на чтение сообщений, вызывая функцию респондера Read(). По поводу функции Read в документации сказано следующее:
Read a message of type R into msg. Completion will be notified by tag on the associated completion queue.
что означает: [Функция Read()] считывает сообщение типа R в переменную msg. Уведомление о завершении операции чтения будет сделано тэгом из очереди.

После первого вызова функции Read() переменная, содержащая ответ (reply) будет все еще пустой, так как события окончания чтения сообщения еще не произошло.

Когда все сообщения от сервера прочитаны, флаг ok вернется из очереди со значением false, произойдет вызов функции Finish() респондера, после чего респондер перейдет в состояние FINISH. После получения из очереди объекта класса с состоянием FINISH, этот класс будет удален.

Чтобы создать объект класса AsyncClientCall1M, нужно в классе GreeterClient добавить функцию GladToSeeMe():

void GladToSeeMe(const std::string& user)
{
  HelloRequest request;
  request.set_name(user);
  new AsyncClientCall1M(request, cq_, stub_);
}

Консольный вывод сервера и клиента в режиме один ко многим


Консольный вывод сервера для режима один ко многим выглядит следующим образом:

[Proceed1M]: New responder for 1-M mode
[Proceed1M]: New responder for 1-M mode
[Proceed1M]: request message = client
[Proceed1M]: Writing
[Proceed1M]: Writing
[Proceed1M]: Writing
[Proceed1M]: Writing
[Proceed1M]: Trying finish
[Proceed1M]: Good Bye

Сначала создается первый респондер. После того, как получен запрос от клиента, создается второй респондер, который будет обрабатывать следующие запросы. Запрос от клиента содержит сообщение “client”. После этого четыре раза отправляются ответы клиенту, после чего вызывается функция респондера Finish(). При следующем получении объекта в состоянии FINISH из очереди, объект удаляется.
Консольный вывод клиента для режима один ко многим выглядит следующим образом:
[Proceed1M]: new client 1-M
[Proceed1M]: reply message empty
[Proceed1M]: reply message = Hello client!
[Proceed1M]: reply message = I'm very glad to see you!
[Proceed1M]: reply message = Haven't seen you for thousand years.
[Proceed1M]: reply message = I'm server now. Call me later.
[Proceed1M]: Trying finish
[Proceed1M]: Good Bye

Сначала создается новый клиент. После первого вызова функции респондера Read() переменная, содержащая ответ еще не заполнена, поэтому на экран выводится сообщение “reply message empty”. Затем клиент получает от сервера 4 сообщения, после чего из очереди возвращается флаг ok, имеющий значение false. Вызывается функция респондера Finish(). При следующем получении объекта из очереди, объект удаляется.

Режим взаимодействия многие к одному


Код на сервере


Класс CallDataM1, отвечающий за режим взаимодействия многие к одному на сервере выглядит следующим образом:

class CallDataM1: public CommonCallData
{
  ServerAsyncReader<HelloReply, HelloRequest> responder_;
  bool new_responder_created;
public:
  CallDataM1(Greeter::AsyncService* service, ServerCompletionQueue* cq):
  CommonCallData(service, cq), responder_(&ctx_), new_responder_created(false){Proceed();}
  virtual void Proceed(bool ok = true) override
  {
    if(status_ == CREATE)
    {
      std::cout << "[ProceedM1]: New responder for M-1 mode" << std::endl;
      status_ = PROCESS ;
      service_->RequestGladToSeeYou(&ctx_, &responder_, cq_, cq_, this);
    }
    else if(status_ == PROCESS)
    {
      if(!new_responder_created)
      {
        new CallDataM1(service_, cq_);
        new_responder_created = true ;
      }
      //It's time to send reply
      if(!ok)
      {
        std::string greeting("Hello, Client!");
        reply_.set_message(greeting);
        std::cout << "[ProceedM1]: Sending reply" << std::endl;
        status_ = FINISH;
        responder_.Finish(reply_, Status(), (void*)this);
        return ;
      }
      responder_.Read(&request_, (void*)this);
      if(!request_.name().empty())
        std::cout << "[ProceedM1]: request message =" << request_.name() << std::endl;
    }
    else // if(status_ == FINISH)
    {
      std::cout << "[ProceedM1]: Good Bye" << std::endl;
      delete this;
    }
  }
};

Класс CallDataM1 наследуется от базового класса CommonCallData, содержит два члена класса:

  • респондер ServerAsyncReader<HelloReply, HelloRequest>;
  • флаг того, что новый респондер был создан bool new_responder_created

Все члены класса инициализируются в списке инициализации конструктора, после чего вызывается реализация функции Proceed(). В состоянии CREATE точно также далается запрос на обработку следующего запроса клиента с указанием в качестве тэга респондера. После этого респондер переходит в состояние PROCESS. Код в условии else if(status_ == PROCESS) этого класса очень напоминает код класса клиента ClientAsyncCall1M.

На самом деле, принцип работы клиентского класса ClientAsyncCall1M и класса сервера CallDataM1 совершенно одинаков. Сервер читает запросы клиента до тех пор, пока из очереди не будет получен тэг с флагом ok равным false. Точно также, как и в случае клиентского класса в режиме один ко многим, флаг ok со значением false означает то, что сообщений от клиента больше не будет. После того, как флаг ok со значением false получен, вызывается функция респондера Finish и респондер переходит в состояние FINISH. При получении из очереди объекта в состоянии FINISH, объект удаляется.

Чтобы создать самый первый объект класса CallDataM1, нужно в функцию Run() добавить строчку:

new CallDataM1(&service_, cq_.get());


Код на клиенте


Класс AsyncClientCallM1, отвечающий за режим взаимодействия многие к одному на клиенте выглядит следующим образом:

class AsyncClientCallM1 : public CommonAsyncClientCall
{
  std::unique_ptr< ClientAsyncWriter<HelloRequest> > responder;
  unsigned mcounter;
  bool writing_mode_;
public:
  AsyncClientCallM1(CompletionQueue& cq_, std::unique_ptr<Greeter::Stub>& stub_):
  CommonAsyncClientCall(), mcounter,writing_mode_(true)
  {
    std::cout << "[ProceedM1]: new client M-1" << std::endl;
    responder = stub_->AsyncGladToSeeYou(&context, &reply, &cq_, (void*)this);
    callStatus = PROCESS ;
  }
  virtual void Proceed(bool ok = true) override
  {
    if(callStatus == PROCESS)
    {
      if(writing_mode_)
      {
        static std::vector<std::string> greeting = {"Hello, server!",
          "Glad to see you!",
          "Haven't seen you for thousand years!",
          "I'm client now. Call me later."};
        if(mcounter < greeting.size())
        {
          HelloRequest request;
          request.set_name(greeting.at(mcounter));
          std::cout << "[ProceedM1]: Writing" << std::endl;
          responder->Write(request, (void*)this);
          ++mcounter ;
        }
        else
        {
          responder->WritesDone((void*)this);
          std::cout << "[ProceedM1]: changing state to reading" << std::endl;
          writing_mode_ = false;
          return;
        }
      }
      else//reading mode
      {
        std::cout << "[ProceedM1]: trying finish" << std::endl;
        responder->Finish(&status, (void*)this);
        callStatus = FINISH ;
      }
    }
    else if(callStatus == FINISH)
    {
      assert(!reply.message().empty());
      printReply("ProceedM1");
      std::cout << "[ProceedM1]: Good Bye" << std::endl;
      delete this;
    }
    return ;
  }
};

Класс AsyncClientCallM1 наследуется от класса CommonAsyncClientCall, имеет соответствующий респондер и счетчик сообщений. Помимо этих двух членов, класс AsyncClientCallM1 имеет флаг состояния. Принцип работы функции Proceed(), в состоянии PROCESS в режиме записи (условие if(writing_mode_)) совпадает с принципом работы функции Proceed() класса CallData1M с той разницей, что здесь после отправки всех сообщений вызывается функция респондера WritesDone(), тогда как в классе CallData1M вызывается функция Finish(). После вызова функции респондера WritesDone(), клиент переходит в состояние чтения (writing_mode_ = false). В состоянии чтения вызывается функция респондера Finish() и респондер переходит в состояние FINISH.

Здесь стоит обратить внимание на то, что здесь помимо вызова функции респондера WritesDone() происходит вызов функции респондера Finish(). В документации по этому поводу сказано следующее:
It is appropriate to call this method when [...] the client side has no more message to send (this can be declared implicitly by calling this method, or explicitly through an earlier call to the WritesDone method of the class in use, e.g. ClientAsyncWriterInterface::WritesDone or ClientAsyncReaderWriterInterface::WritesDone)
что означает примерно следующее: Нужно вызывать этот метод (Finish()) когда клиент не имеет больше сообщений, которые нужно отправить серверу (что может быть объявлено неявно, просто вызвав этот метод, либо явно путем вызова метода WritesDone()). Но поскольку класс AsyncClientCallM1 отправляет серверу несколько сообщений, вызывая метод Write(), то метод респондера WritesDone() должен быть вызван явно, чтобы информировать сервер о том, что больше сообщений не будет (в отличии от класса AsyncClientCall1M, который отправляет только один запрос серверу при вызове функции AsyncGladToSeeMe()). Поэтому после вызова метода WritesDone() респондер переходит в состояние чтения и уже в нем вызывается функция Finish(). При получении из очереди объекта в состоянии FINISH, ответ от сервера выводится на экран и объект удаляется.

Чтобы создать объект класса AsyncClientCallM1, нужно в классе GreeterClient добавить функцию GladToSeeYou():

void GladToSeeYou()
{
  new AsyncClientCallM1(cq_, stub_);
}

Консольный вывод сервера и клиента в режиме многие к одному


Консольный вывод сервера для режима многие к одному выглядит следующим образом:

[ProceedM1]: New responder for M-1 mode
[ProceedM1]: New responder for M-1 mode
[ProceedM1]: request message = Hello, server!
[ProceedM1]: request message = Glad to see you!
[ProceedM1]: request message = Haven't seen you for thousand years!
[ProceedM1]: request message = I'm client now. Call me later.
[ProceedM1]: Sending reply
[ProceedM1]: Good Bye

Сначала создается первый респондер. После того, как получен запрос от клиента, создается второй респондер, который будет обрабатывать следующие запросы. Затем считываются четыре соообщения от клиента. Запрос сообщений происходит функцией респондера Read(). Точно также, как в случае с клиентским классом AsyncClientCall1M при первом чтении переменной сообщений она окажется пустой. После чтения четырех сообщений из очереди возвращается флаг ok, имеющий значение false, что означает, что клиент вызвал функцию WritesDone(). Вызывается функция респондера Finish() и отправляется ответ клиенту. При следующем получении объекта в состоянии FINISH из очереди, объект удаляется.

Консольный вывод клиента для режима многие к одному выглядит следующим образом:

[ProceedM1]: new client M-1
[ProceedM1]: Writing
[ProceedM1]: Writing
[ProceedM1]: Writing
[ProceedM1]: Writing
[ProceedM1]: changing state to reading
[ProceedM1]: trying finish
[ProceedM1]: reply message = Hello, Client!
[ProceedM1]: Good Bye

Сначала создается новый клиент. Отправляeтся четыре запроса серверу, после чего вызывается функция WritesDone() и клиент переходит в состояние чтения. При получении объекта из очереди в состоянии чтения (PROCESS), вызывается функция респондера Finish() и клиент переходит в состояние FINISH. При получении объекта из очереди в состоянии FINISH, на экран выводится текст ответа от сервера и объект удаляется.

Режим взаимодействия многие к многим


Код на сервере


Класс CallDataMM, отвечающий за режим взаимодействия многие ко многим на сервере выглядит следующим образом:

class CallDataMM: public CommonCallData
{
  ServerAsyncReaderWriter<HelloReply, HelloRequest> responder_;
  unsigned mcounter;
  bool writing_mode_;
  bool new_responder_created;
public:
  CallDataMM(Greeter::AsyncService* service, ServerCompletionQueue* cq):
  CommonCallData(service, cq), responder_(&ctx_), mcounter, writing_mode_(false), new_responder_created(false){Proceed();}
  virtual void Proceed(bool ok = true) override
  {
    if(status_ == CREATE)
    {
      std::cout << "[ProceedMM]: New responder for M-M mode" << std::endl;
      status_ = PROCESS ;
      service_->RequestBothGladToSee(&ctx_, &responder_, cq_, cq_, this);
    }
    else if(status_ == PROCESS)
    {
      if(!new_responder_created)
      {
        new CallDataMM(service_, cq_);
        new_responder_created = true ;
      }
      if(!writing_mode_)//reading mode
      {
        if(!ok)
        {
          writing_mode_ = true;
          ok = true;
          std::cout << "[ProceedMM]: changing state to writing" << std::endl;
        }
        else
        {
          responder_.Read(&request_, (void*)this);
          if(!request_.name().empty())
          std::cout << "[ProceedMM]: request message =" << request_.name() << std::endl;
        }
      }
      if(writing_mode_)//writing mode
      {
        static std::vector<std::string> greeting = {std::string(prefix + "client" "!"),
        "I'm very glad to see you!",
        "Haven't seen you for thousand years.",
        "How are you?",
        "I'm server now. Call me later."};
        if(!ok || mcounter >= greeting.size())
        {
          std::cout << "[ProceedMM]: Trying finish" << std::endl;
          status_ = FINISH;
          responder_.Finish(Status(), (void*)this);
        }
        else
        {
          reply_.set_message(greeting.at(mcounter));
          responder_.Write(reply_, (void*)this);
          ++mcounter;
        }
      }
    }
    else // if(status_ == FINISH)
    {
      std::cout << "[ProceedMM]: Good Bye" << std::endl;
      delete this;
    }
  }
};

Класс CallDataMM наследуется от базового класса CommonCallData, содержит четыре члена класса:

  • респондер ServerAsyncReaderWriter<HelloReply, HelloRequest&gt;
  • счетчик отправленных сообщений unsigned mcounter;
  • флаг состояния чтения/записи bool writing_mode_;
  • флаг того, что новый респондер был создан bool new_responder_created

Все члены класса инициализируются в списке инициализации конструктора, после чего вызывается реализация функции Proceed(). В состоянии CREATE точно также далается запрос на обработку следующего запроса клиента с указанием в качестве тэга респондера. После этого респондер переходит в состояние PROCESS. Код функции Proceed() в состоянии PROCESS состоит из двух частей — чтения сообщений от клиента и отправка сообщений клиенту. Можно сказать, что это объединение двух классов: CallData1M для состояние записи и CallDataM1 для состояние чтения. Начнем с состояния чтения if(!writing_mode_). Точно также как в классе CallDataM1 мы считываем сообщения от клиента до тех пор, пока из очереди не придет флаг ok со значением false. Когда флаг ok со значением false получен из очереди, респондер переключается в состояние записи writing_mode_ = true;. Обратите внимание, что работа респондера продолжается, но уже в состоянии записи. Так происходит потому, что респондер до сих пор не сделал ни одного запроса на добавление события в очередь событий.

В состоянии записи мы отправляем сообщения клиенту ровно (greeting.size()-1) раз. После этого вызывается функция респондера Finish() и респондер переходит в состояние FINISH. При получении объекта из очереди в состоянии FINISH, объект удаляется.

Чтобы создать самый первый объект класса CallDataMМ, нужно в функцию Run() добавить строчку:

new CallDataMМ(&service_, cq_.get());

Код на клиенте


Класс AsyncClientCallMM, отвечающий за режим взаимодействия многие ко многим на клиенте выглядит следующим образом:

class AsyncClientCallMM : public CommonAsyncClientCall
{
  std::unique_ptr< ClientAsyncReaderWriter<HelloRequest,HelloReply> > responder;
  unsigned mcounter;
  bool writing_mode_;
public:
  AsyncClientCallMM(CompletionQueue& cq_, std::unique_ptr<Greeter::Stub>& stub_):
  CommonAsyncClientCall(), mcounter, writing_mode_(true)
  {
    std::cout << "[ProceedMM]: new client M-M" << std::endl;
    responder = stub_->AsyncBothGladToSee(&context, &cq_, (void*)this);
    callStatus = PROCESS ;
  }
  virtual void Proceed(bool ok = true) override
  {
    if(callStatus == PROCESS)
    {
      if(writing_mode_)
      {
        static std::vector<std::string> greeting = {"Hello, server!",
        "Glad to see you!",
        "Haven't seen you for thousand years!",
        "I'm client now. Call me later."};
        //std::cout << "[ProceedMM]: mcounter = " << mcounter << std::endl;
        if(mcounter < greeting.size())
        {
          HelloRequest request;
          request.set_name(greeting.at(mcounter));
          responder->Write(request, (void*)this);
          ++mcounter;
        }
        else
        {
          responder->WritesDone((void*)this);
          std::cout << "[ProceedMM]: changing state to reading" << std::endl;
          writing_mode_ = false;
        }
        return ;
      }
      else //reading mode
      {
        if(!ok)
        {
          std::cout << "[ProceedMM]: trying finish" << std::endl;
          callStatus = FINISH;
          responder->Finish(&status, (void*)this);
          return;
        }
        responder->Read(&reply, (void*)this);
        printReply("ProceedMM");
      }
      return;
    }
    else if(callStatus == FINISH)
    {
      std::cout << "[ProceedMM]: Good Bye" << std::endl;
      delete this;
    }
  }
};

Класс AsyncClientCallMM наследуется от класса CommonAsyncClientCall, имеет соответствующий респондер, счетчик сообщений и флаг состояния чтения/записи. Как и в случае с классом CallDataMM на сервере, код функции Proceed() класса AsyncClientCallMM в состоянии PROCESS состоит из двух частей: отправки запросов на сервер и чтение ответов с сервера. Можно сказать, что это объединение двух классов: класса AsyncClientCallM1 для состояния записи и AsyncClientCall1M для состояния чтения.

Начнем с режима записи запросов на сервер if(writing_mode_). Сначала на сервер отправляется ровно (greeting.size() — 1) запросов, после чего вызывается функция респондера WritesDone() и респондер переходит в состояние чтения. В этом месте смена состояния сопровождается выходом из функции Proceed(), в отличии от класса CallDataMM на сервере, где работа функции продолжается. В состоянии чтения, чтение сообщений от сервера происходит до тех пор, пока из очереди не вернется флаг ok со значением false. Когда флаг ok со значением false получен из очереди, вызывается функция респондера Finish() и респондер переходит в состояние FINISH. При получении объекта из очереди в состоянии FINISH, объект удаляется.

Чтобы создать объект класса AsyncClientCallMM, нужно в классе GreeterClient добавить функцию BothGladToSee():

void BothGladToSee()
{
  new AsyncClientCallMM(cq_, stub_);
}

Консольный вывод сервера и клиента в режиме многие ко многим


Консольный вывод сервера для режима многие ко многим выглядит следующим образом:

[ProceedMM]: New responder for M-M mode
[ProceedMM]: New responder for M-M mode
[ProceedMM]: request message = Hello, server!
[ProceedMM]: request message = Glad to see you!
[ProceedMM]: request message = Haven't seen you for thousand years!
[ProceedMM]: request message = I'm client now. Call me later.
[ProceedMM]: changing state to writing
[ProceedMM]: Trying finish
[ProceedMM]: Good Bye

Сначала создается первый респондер. После того, как получен запрос от клиента, создается второй респондер, который будет обрабатывать следующие запросы. Затем считываются четыре соообщения от клиента. Запрос на чтение сообщений происходит функцией респондера Read(). После чтения четырех сообщений из очереди возвращается флаг ok, имеющий значение false, что означает, что клиент вызвал функцию WritesDone(). Респондер меняет состояние чтения на состояние записи. Клиенту отправляются пять сообщений, после чего вызывается функция респондера Finish() и респондер переходит в состояние FINISH. При следующем получении объекта в состоянии FINISH из очереди, объект удаляется.

Консольный вывод клиента для режима многие ко многим выглядит следующим образом:

[ProceedMM]: new client M-M
[ProceedMM]: changing state to reading
[ProceedMM]: reply message empty
[ProceedMM]: reply message = Hello client!
[ProceedMM]: reply message = I'm very glad to see you!
[ProceedMM]: reply message = Haven't seen you for thousand years.
[ProceedMM]: reply message = How are you?
[ProceedMM]: reply message = I'm server now. Call me later.
[ProceedMM]: trying finish
[ProceedMM]: Good Bye

Сначала создается новый клиент. Отправляeтся четыре запроса серверу, после чего вызывается функция WritesDone() и клиент переходит в состояние чтения. В состоянии чтения клиент получает пять сообщений от сервера, после чего из очереди возвращается флаг ok со значением false, что соответствует вызову на сервере функции респондера Finish(). Вызывается функция респондера Finish() и клиент переходит в состояние FINISH. При получении объекта из очереди в состоянии FINISH объект удаляется.

6. Заключение и список использованных источников


В целом, вниманительно изучив комментарии к коду из примера в документации, разобравшись с тестами в репозитории gRPC и вдумчиво покопавшись в документации можно во всем разобраться самому. Именно так и было в моем случае. Но хочу добавить, что с самого начала и, в дальнейшем, на разных этапах знакомства с асинхронными режимами фреймворка, я искал подобную статью. Искал и не нашел, в итоге, написал такую статью сам. Если для кого-нибудь эта статья окажется полезной, это будет лучшей наградой за все приложенные при написании усилия!

Cписок использованных источников:


Поделиться публикацией
AdBlock похитил этот баннер, но баннеры не зубы — отрастут

Подробнее
Реклама
Комментарии 0

Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.