Разработчик
0,0
рейтинг
24 ноября 2015 в 15:47

Разработка → Атомарная обработка блоков данных без блокировки из песочницы

Использование алгоритмов без блокировки всегда было чем-то пугающим для разработчика. Очень трудно представить себе организацию доступа к данным без блокировки, таким образом, чтобы два или более потока не могли одновременно обрабатывать один и тот же блок данных. Большинство разработчиков используют стандартные контейнеры типа стеков или связных списков без блокировки, но не более того. В этой же статье я хотел бы рассказать, как организовать доступ к данным в многопоточной среде без блокировки.

Основная идея такого метода заключается в том, что каждый поток использует отдельный буфер, в который копирует данные из основного буфера, обрабатывает их и затем меняет местами указатель на свой буфер с указателем на основной буфер. Рассмотрим следующий код:

#include <stdio.h>

struct data_s {
  int a;
  int b;
};
struct data_s reader_tmp, writer_tmp, data;
struct data_s *reader_tmp_ptr, *writer_tmp_ptr, *data_ptr;
int done = 0;

void process(struct data_s *data) {
  data->a++;
  data->b++;
}
void* writer(void* p) {
  struct data_s *tmp_ptr;
  int i;
  for(i = 0; i < 1000000; i++) {
    do {
      tmp_ptr = data_ptr;
      writer_tmp_ptr->a = tmp_ptr->a;
      writer_tmp_ptr->b = tmp_ptr->b;
      process(writer_tmp_ptr);
    } while(!__sync_compare_and_swap(&data_ptr, tmp_ptr, writer_tmp_ptr));
    writer_tmp_ptr = tmp_ptr;
  }
}

void* reader(void *p) {
  struct data_s *tmp_ptr;
  int a, b;
  while(!done) {
    do {
      tmp_ptr = data_ptr;
      reader_ptr->a = tmp_ptr->a;
      reader_ptr->b = tmp_ptr->b;
      a = tmp_ptr->a;
      b = tmp_ptr->b;
    } while(!__sync_bool_compare_and_swap(&data_ptr, tmp_ptr, reader_tmp_ptr));
    reader_tmp_ptr = tmp_ptr;
    printf(“data = {%d, %d}\n”, a, b);
  }
}

int main() {
  pthread_t reader_thread, writer_thread;
  data.a = 0;
  data.b = 0;
  data_ptr = &data;
  writer_tmp_ptr = &writer_tmp;
  reader_tmp_ptr = &reader_tmp;
  pthread_create(&read_thread, NULL, reader, NULL);
  pthread_create(&write_thread, NULL, writer, NULL);
  pthread_join(write_thread, NULL);
  done = 1;
  pthread_join(read_thread, NULL);
  return 0;
}

В приведённом коде данные перед обработкой копируются из буфера, на который указывает data_ptr в буфер, на который указывает writer_tmp_ptr. А затем эти указатели меняются местами. Причём в data_ptr writer_tmp_ptr записывается с использованием атомарной операции compare_and_swap, которая сравнивает первый аргумент со вторым и если они совпадают, то записывает третий аргумент в первый и возвращает true. Иначе, возвращает false. Для чего это нужно? Рассмотрим на примере функции reader. Допустим поток, выполняющий эту функцию приостановился после строчки a = tmp_ptr->a; В этот момент, tmp_ptr указывает на data. Тут же начал работать поток, выполняющий функцию writer. Выполнив первую итерацию он поменял местами writer_tmp_ptr и data_ptr и начал следующую итерацию, остановившись после строчки data->b++; В данной ситуации writer_tmp_ptr указывает на data и tmp_ptr в функции reader указывает на data. Получается одновременное чтение и модификация одного и того же буфера. Но так как указатели data_ptr и tmp_ptr уже не совпадают, то операция compare_and_swap обнаружит коллизию и выполнит операцию чтения ещё раз. Почему же присваивание reader_tmp_ptr = tmp_ptr не проходит такую проверку?

Всё просто. Переменная reader_tmp_ptr является специфичной переменной для потока, в котором она выполняется. В данном примере я сделал её глобальной, что не совсем правильно, т.к. в случае, с несколькими читающими потоками, пришлось бы заводить ещё одну глобальную переменную для второго потока, и внутри функции определять, какой поток в данный момент выполняется, чтоб использовать ту или иную переменную в качестве уникального для потока указателя на буфер. Оптимальный вариант — это использование т.н. специфичные для потока переменные. Например, библиотека pthread имеет такие замечательные функции, как pthread_getspecific/pthread_setspecific. Целью же написания этого кода было наглядно показать читателю, как работает данный алгоритм. Без оптимизаций, которые могут только запутать представление о самой сути.

Казалось бы, всё идеально, программа должна выводить на экран пары одинаковых значений, но не так всё просто. Представим также, что поток, выполняющий функцию reader, остановился после строчки a = tmp_ptr->a; после чего, поток, выполняющий функцию writer, завершил 2 итерации и выполняет третью. Остановившись после завершения функции process. Далее поток, выполняющий функцию reader возобновляет свою работу. В этой ситуации значения переменных a и b не совпадут, но операция compare_and_swap вернёт true, т.к. data_ptr снова указвает на data, другими словами data_ptr и tmp_ptr снова совпадают. Это называется проблема ABA. Одним из способов решения этой проблемы, является добавление к указателю счётчика, который увеличивается каждый раз, когда ему присваивается новое значение. В следующем примере такая проблема отсутствует.

#include <stdio.h>
#include <stdint.h>
#include <pthread.h>

struct data_s {
  int a;
  int b;
};

struct data_pointer_s {
  union {
    uint64_t qw[2];
    struct {
      struct data_s *data_ptr;
      uint64_t aba_counter;
    };
  };
};

static inline char cas128bit(volatile struct data_pointer_s *a, struct data_pointer_s b, struct data_pointer_s c) {
  char result;
  __asm__ __volatile__(
    "lock cmpxchg16b %1\n\t"
    "setz %0\n"
    : "=q" (result)
    , "+m" (a->qw)
    : "a" (b.data_ptr), "d" (b.aba_counter)
    , "b" (c.data_ptr), "c" (c.aba_counter)
    : "cc"
  );
  return result;
}
struct data_s reader_tmp, writer_tmp, data;
struct data_pointer_s reader_tmp_ptr, writer_tmp_ptr, data_ptr;
int done = 0;

void process(struct data_s *data) {
  data->a++;
  data->b++;
}

void* writer(void* p) {
  struct data_pointer_s tmp_ptr;
  int i;
  for(i = 0; i < 1000000; i++) {
    do {
      tmp_ptr = data_ptr;
      writer_tmp_ptr.data_ptr->a = tmp_ptr.data_ptr->a;
      writer_tmp_ptr.data_ptr->b = tmp_ptr.data_ptr->b;
      process(writer_tmp_ptr.data_ptr);
      writer_tmp_ptr.aba_counter = tmp_ptr.aba_counter + 1;
    } while(!cas128bit(&data_ptr, tmp_ptr, writer_tmp_ptr));
    writer_tmp_ptr = tmp_ptr;
  }
}

void* reader(void *p) {
  struct data_pointer_s tmp_ptr;
  int a, b;
  while(!done) {
    do {
      tmp_ptr = data_ptr;
      reader_tmp_ptr.data_ptr->a = tmp_ptr.data_ptr->a;
      reader_tmp_ptr.data_ptr->b = tmp_ptr.data_ptr->b;
      a = tmp_ptr.data_ptr->a;
      b = tmp_ptr.data_ptr->b;
      reader_tmp_ptr.aba_counter = tmp_ptr.aba_counter + 1;
    } while(!cas128bit(&data_ptr, tmp_ptr, reader_tmp_ptr));
    reader_tmp_ptr = tmp_ptr;
    printf("data = {%d, %d}\n", a, b);
  }
}

int main() {
  pthread_t reader_thread, writer_thread;
  
  data.a = 0;
  data.b = 0;
  data_ptr.data_ptr = &data;
  data_ptr.aba_counter = 0;
  writer_tmp_ptr.data_ptr = &writer_tmp;
  writer_tmp_ptr.aba_counter = 0;
  reader_tmp_ptr.data_ptr = &reader_tmp;
  reader_tmp_ptr.aba_counter = 0;
  
  pthread_create(&reader_thread, NULL, reader, NULL);
  pthread_create(&writer_thread, NULL, writer, NULL);

  pthread_join(writer_thread, NULL);
  done = 1;
  pthread_join(reader_thread, NULL);
  return 0;
}

Следует отметить, что эффективность данного кода зависит от объёма копируемых данных и от сложности функции process. Если требуется атомарная обработка блоков данных, объёмами в несколько десятков мегабайт, то использование мьютексов было бы намного эффективнее. Также неплохо было бы рассмотреть возможность добавления небольшой задержки(порядка нескольких микросекунд) каждый раз после того как compare_and_swap возвращает false, чтоб дать другому потоку возможность закончить операцию. Опять-таки, наличие задержки, и время напрямую будут зависить от специфики выполняемой задачи.

Отдельно хотелось бы выразить благодарность пользователю vladvic за помощь в понимании и представлении того, как действует данный алгорим.
Алексей @Alex_1982
карма
6,0
рейтинг 0,0
Разработчик
Реклама помогает поддерживать и развивать наши сервисы

Подробнее
Спецпроект

Самое читаемое Разработка

Комментарии (4)

  • 0
    «Если требуется атомарная обработка блоков данных, объёмами в несколько десятков мегабайт, то использование мьютексов было бы намного эффективнее.»
    Это очень зависит от того насколько трудоёмка функция обработки данных. Если копирование намного быстрее, то скорее всего более эффективно будет скопировать и освободить указатель.
    И в любом случае флажки-спинлоки в 99% случаев более эффективны нежели мьютексы.
  • +1
    Когда-то также интересовался этой темой. Даже написал свою реализацию lock-free очереди (GitHub, StackExchange).
    Идея там простая: есть 2 потока, один пишет в очередь, другой из нее читает. Внутри очереди есть 2 подочереди — одна для писателя, другая для читателя. Когда читатель вычитывает свою подочередь, он забирает подочередь писателя, а писатель начинает новую.
    В базовам варианте (один читатель, один писатель) для синхронизации используется единственная atomic переменная.
    • 0
      Это понятно, но в данной статье я специально не стал затрагивать тему lock-free стэков и очередей, т.к. на эту тему информации в сети можно найти сколько угодно. Тут была именно попытка реализовать что-то наподобие критических секций, без использования медленных мьютексов.
  • 0
    Просто как вариант.
    Если гарантированно, что писатель один, а читателей сколько угодно, то см. «Non-blocking Write Protocol» в Real-Time Systems: Design Principles for Distributed Embedded Applications. Решается одной atomic переменной, как указал HaronK
    Формат данных не важен (очередь, стек, и т.п. — просто блок данных)

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.