Pull to refresh

Обмен сообщениями между ролями и экземплярами

Reading time5 min
Views1.4K
Сегодня вашему вниманию представляется вторая статья из цикла статей об архитектурных решениях сервиса AtContent.com.

В процессе работы над сервисом AtContent.com у нас возникла задача о синхронизации между экземплярами внутри роли, а также между ролями. Стандартные средства из SDK не позволяют решить эту проблему двумя строчками кода. Поэтому мы разработали собственное решение для коммуникации между экземплярами. Оно позволяет выполнять задания на всех экземплярах роли сразу, либо на одном экземпляре, который выбирается определенным образом либо случайно.


Суть решения заключается в коммуникации между экземплярами роли по HTTP. Для того, чтобы делать это безопасно нужно создать Endpoint в настройках роли.


Рис 1. Создание Endpoint

Тип точки – Internal, и указать какой-нибудь незанятый Private port. Это нужно для того, чтобы обмен происходил внутри дата центра. После добавления можно обращаться к этой точке по протоколу HTTP и отправлять запросы.

Чтобы обрабатывать запросы на экземпляре необходимо в Web Role добавить Generic Handler. Здесь это Communicator.ashx


Рис 2. Создание Generic Handler

В принципе уже сейчас можно устраивать обмен сообщениями, но предстоит решить ещё несколько проблем.

Первая проблема – это безопасность. Добавленный ранее обработчик доступен и по внешнему адресу и порту. Поэтому необходимо защитить его на входе. Самым простым способом является добавление в POST-запрос параметра, который будет выполнять роль ключа.

Вторая проблема – это обращение к Internal Endpoint для экземпляра. Для её решения SDK предлагает замечательный инструмент – ServiceRuntime. Чтобы им воспользоваться нужно подключить соответсвующие пространства имен:
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.ServiceRuntime;

После этого мы можем выбирать нужные нам экземпляры роли и отправить на них POST-запросы:
//Формируем набор параметров
var Params = new Dictionary<string, string>();
Params["key"] = "key_value";

//Кодируем его для POST-запроса
string STDIN = "";
foreach (var Item in Params)
{
    STDIN += Item.Key + "=" + HttpUtility.UrlEncode(Item.Value, Encoding.UTF8) + "&";
}
byte[] sentData = Encoding.UTF8.GetBytes(STDIN);

foreach (RoleInstance roleInst in RoleEnvironment.Roles["WebRole1"].Instances)
{
    //Создаем WebRequest для отправки запроса
    WebRequest reqPOST = WebRequest.Create(
        "http://" + roleInst.InstanceEndpoints["InterEndpoint"].IPEndpoint + "/Communicator.ashx");
    reqPOST.Method = "POST";
    reqPOST.Timeout = 120000;
    reqPOST.ContentType = "application/x-www-form-urlencoded";

    //Отправляем запрос
    reqPOST.ContentLength = sentData.Length;
    System.IO.Stream sendStream = reqPOST.GetRequestStream();
    sendStream.Write(sentData, 0, sentData.Length);
    sendStream.Close();

    System.Net.WebResponse resp = reqPOST.GetResponse();
    System.IO.Stream stream = resp.GetResponseStream();
    System.IO.StreamReader sr = new StreamReader(stream);
    string s = sr.ReadToEnd();
}

Данный пример показывает как отправить запросы сразу на все экземпляры роли «WebRole1». В Dictionary Params можно добавить дополнительные параметры, которые будут участвовать в обработке задачи.

Если же требуется отправить задание на определенный экземпляр, или на случайный – то просто выбираем экземпляр из RoleEnvironment.Roles[«WebRole1»].Instances по нужному нам правилу и отправляем запрос только на него.

При этом в Communicator.ashx нам остается обработать данный POST-запрос и выполнить необходимые команды:
//Не забываем про безопасность
if (context.Request.Form["key"] != "key_value") return;            
//Обрабатываем запрос

Самым распространенным сценарием в нашем сервисе является изменение кэша экземпляра.

С отправкой сообщения на экземпляр сервиса (Worker Role) ситуация обстоит несколько сложнее. Стандартным сценарием взаимодействия с ним является использование очередей (Queue). Но это не означает, что нельзя взаимодействовать с экземплярами сервиса по HTTP. Для этого необходимо также как и для экземпляра приложения добавить Endpoint с типом Internal и укзать для него Private port. И при старте экземпляра сервиса нужно добавить HTTPListener, который будет обрабатывать поступающие POST-запросы.
private HttpListener listener = null;
private AutoResetEvent connectionWaitHandle = new AutoResetEvent(false);

public override void Run()
{
    //В момент запуска создаем трэд для HttpListener
    Thread HttpListenerThread = null;
    while (true)
    {
        if (HttpListenerThread == null)
        {
            HttpListenerThread = new Thread(new ThreadStart(HttpListenerHandler));
            HttpListenerThread.IsBackground = true;
            HttpListenerThread.Start();
        }
        Thread.Sleep(1000);
    }
}

protected void HttpListenerHandler()
{
    //Создание и настройка HttpListener для обработки запросов с ServiceEndpoint
    if (listener == null)
    {
        listener = new HttpListener();
        var HostEndpoint = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["ServiceEndpoint"].IPEndpoint.ToString();
        listener.Prefixes.Add(string.Format("http://{0}/", HostEndpoint));
        listener.Start();
    }
    while (true)
    {
        IAsyncResult result = listener.BeginGetContext(HandleAsyncConnection, listener);
        connectionWaitHandle.WaitOne();
    }
}

private void HandleAsyncConnection(IAsyncResult result)
{
    //Обработка POST-запроса
    HttpListener listener = (HttpListener)result.AsyncState;
    HttpListenerContext context = listener.EndGetContext(result);
    connectionWaitHandle.Set();
    var Request = context.Request;
    var Response = context.Response;
    if (Request.HttpMethod == "POST")
    {
        Stream BodyStream = context.Request.InputStream;
        var encoding = context.Request.ContentEncoding;
        var reader = new StreamReader(BodyStream, encoding);
        var PostParams = HttpUtility.ParseQueryString(reader.ReadToEnd(), encoding);
        if (PostParams["key"] != "key_value") return;
        //Обрабатываем запрос
    }
    Response.OutputStream.Close();
}

Здесь приведен код класса WorkerRole.cs, точнее той его части, которая подверглась изменениям. Следует заметить, что для обработки запросов нам понадобится пространство имен System.Web. Также необходимо добавить соответсвующий Reference в WorkerRole.

После всего этого мы получили работающий механизм для обращения как к экземплярам внутри одной роли, так и к экземплярам между ролями. Сценариев применения такого взаимодействия можно найти очень много. О некоторых из них я расскажу в следующих статьях из цикла – «Кэширование данных на экземпляре и управление кешированием» и «Эффективное управление обработкой облачными очередями (Queue)».

Описаный в статье механизм с некоторыми доработками будет доступен в составе OpenSource библиотеки CPlase для Windows Azure, которая в скором времени появится в открытом доступе.

Читайте в серии:
Tags:
Hubs:
+7
Comments4

Articles

Change theme settings