Pull to refresh

Удобный двоичный источник данных взамен Stream

Reading time9 min
Views8.3K
Как обычно получают двоичные данные ваши .NET-компоненты? Если отбросить примитивный случай когда все данные уже в байтовом массиве, то уверен что в виде System.IO.Stream. В общем случае он позволяет собственно только одну операцию — считать в указанный байтовый массив (буфер) указанное количество байтов. При выполнении чтения с помощью этой операции возникают два вида затруднений и одно нерациональное использование ресурсов.

Затруднение номер один: если данные одного и того же источника нужны в нескольких компонентах, то после того как один компонент считал какие то данные из Stream, то он их «потребил», и другим компонентам они уже никак не достанутся. Затруднение номер два: данные нам нужны в виде некоторых блоков, а в результате чтения блок может оказаться в буфере лишь частично (только три байта 32-х битного числа, только половина букв слова и т.д.). Нерациональное использование ресурсов возникает из-за того, что каждый читающий данные компонент должен создавать свой собственный буфер для чтения. Далее я предлагаю простое в использовании решение указанных затруднений, которое позволит очистить ваш код, получить высокую производительность чтения и получать универсальные компоненты.

Сначала рассмотрим как мы можем решить указанные затруднения не отказываясь от использования Stream.

Проблема неполной доступности блоков информации принципиально решаема, но ценой возникновения других проблем. Решение первое — читать данные по одному байту пока не наберём нужное количество, чревато критическим падением производительности. Решение второе — перед каждым обращением к буферу проверять, не достигли ли мы его границы и если достигли, то повторять чтение. Проверки, чтение Stream и последующая коррекция индекса в буфере перед каждым обращением сильно засоряют код и провоцируют многочисленные ошибки. Это можно вынести в отдельный метод, очистив смысловой код, но каждый компонент всё равно придется отдельно снабжать этим методом.

Проблема совместного чтения одного Stream несколькими компонентами нерешаема принципиально, особенно в условиях когда для высокой производительности чтение происходит сразу большими кусками. Всё, что можно придумать, это подавать Stream на вход только одному компоненту, а остальным данные передавать уже другими, нестандартными способами. Такой подход не позволяет создавать универсальные компоненты и убивает на корню какую-либо их композицию или замену.

Судя по анализу некоторых проектов, обходу этих двух затруднений бывает посвящена значительная часть кода. И более того, смысловая нагрузка проектов иногда теряется в дебрях кода, обеспечивающего чтение источника данных.

Уточнение, потребовавшееся по результатам анализа комментариев к статье. Моё предложенное далее решение не меняет модель использования источника данных. Другие модели, когда не потребитель запрашивает данные, а источник проталкивает их в виде уведомлений, не относятся к теме статьи, и могут быть реализованы и использованы независимо от моего решения или совместно с ним.

Для принципиального решения указанных затруднений необходимо принять принципиальное решение: отказаться от Stream, заменив его на аналог (назовём его BufferedSource), который будет снабжён буфером, доступным всем компонентам.
В нашем BufferedSource метод чтения заменяем на два: собственно заполнение буфера и пропуск (потребление, удаление) использованных данных из буфера. И, конечно же, добавляем метод, который удостоверяется, что в буфере источника содержится не меньше указанного количества данных и, при необходимости, дочитывает их в буфер.

public interface IBufferedSource
{
// буфер в виде сегмента байтового массива, в котором содержаться текущие данные источника
byte[] Buffer { get; } 
int Offset { get; }
int Count { get; }
// Запрашивает необходимое количество байтов в буфере, бросает исключение если источник исчерпался раньше времени.
void EnsureBuffer (int size);
// пропуск потреблённых данных из начала буфера
void SkipBuffer (int size);
}


Сразу после того, как я начал использовать такой интерфейс на практике, возникла потребность добавить в него ещё свойств и методов. Для источников, поставляющих данные порциями определённого размера (например, криптографические преобразования), возникла потребность в методе считывания в буфер максимально возможного количества данных, не зная какими порциями источник может их предоставлять. Для уменьшения количества вызовов методов без необходимости (особенно актуально для асинхронных методов) понадобилось добавить свойство, показывающее, что источник считал в буфер все имеющиеся данные и больше ничего туда добавить не сможет. Для оптимизации чтения в случае, когда большое количество данных не используется, а просто пропускается, понадобился метод пропуска, пропускающий не только данные буфера, но и дальнейшие данные. Такой пропуск позволяет экономить на обработке данных источником при чтении, а также использовать возможность быстрого позиционирования, которая есть у источников типа файл или память.

После реализации многих источников разных типов и потребителей для них, я пришёл к такому виду интерфейса, который содержит только то необходимое, что нельзя реализовать снаружи источника. Все дополнительные потребности по мере возникновения удовлетворяются в методах расширения.

public interface IBufferedSource
{
// буфер в виде сегмента байтового массива, в котором содержаться текущие данные источника
byte[] Buffer { get; }
int Offset { get; }
int Count { get; }
// Признак исчерпания источника.
bool IsExhausted { get; }
// Заполняет буфер данными источника, дополняя уже доступные там данные. Возвращает размер доступных в буфере данных.
int FillBuffer ();
// Запрашивает у источника указанное количество данных в буфере. Бросает исключение если источник исчерпался раньше времени.
void EnsureBuffer (int size);
// Пропускает указанное количество данных из начала доступных данных буфера.
void SkipBuffer (int size);
// Пытается пропустить указанное количество данных источника, включая доступные в буфере данные. Возвращает количество пропущеных байтов данных.
long TrySkip (long size);
}


Методы EnsureBuffer() и SkipBuffer() являются критически важными по быстродействию, потому что потребитель данных будет вызывать их часто. В подавляющем большинстве вызовов они должны делать только примитивную арифметику с индексом и размером в буфере. Поэтому мы их не объединяем с FillBuffer() и TrySkip(), которые наоборот, в большинстве случаев будут инициировать ресурсоёмкое считывание новых данных.

Некоторые необходимые принципы работы источника, реализующего IBufferedSource, невозможно описать в терминах интерфейса, поэтому описываю их в XML-комментариях и контрактах. Самый главный принцип — свойство Buffer является неизменным всё время жизни источника. Согласно второму принципу свойство IsExhausted меняет своё значение из False в True только один раз за время жизни источника, а значение True уже измениться не может. Третий принцип разрешает методу FillBuffer() вернуть ноль только тогда, когда IsExhausted=True и Count=0. Дополнительное пожелание, соответсвующее изначальным целям экономии на дублировании буферов: источники данных не создают никаких буферов, а принимают готовые в конструкторе.

Учитывая все требования, переходим к конкретным реализациям. Изначально мы заменяем Stream, поэтому первой реализацией стал очень простой ArrayBufferedSource, являющийся аналогом MemoryStream. Следующими реализациями интерфейса стали BufferedSource-источник, получающий данные из Stream (StreamBufferedSource) и наоборот: Stream, получающий данные из BufferedSource-источника (BufferedSourceStream). В продолжении темы замены Stream создал CryptoTransformingBufferedSource, который является аналогом CryptoStream, то есть берёт данные из другого источника и применяет к ним указанное крипто-преобразование. Далее создал источник-транслятор ObservableBufferedSource, который просто дублирует другой источник, но при каждом потреблении данных отправляет уведомления IProgress-получателю, что позволяет создавать индикацию процессов потребления. Более сложным транслятором стал SizeLimitedBufferedSource, ограничивающий другой источник указанным размером. Для более удобной миграции из Stream добавил для IBufferedSource методы расширения Read(байтовый_массив), CopyTo(поток_для_записи), ReadAllBytes() и ReadAllText(кодировка). Всё это выложено в открытый репозиторий на github.

Как создавать потребителей IBufferedSource-источника? Вот, например, разбор AVI видео клипа, который сохранён в файле в кодировке «base-64».

static void Main (string[] args)
{
  using (var stream = new System.IO.FileStream (@"c:\test.avi.base64", FileMode.Open, FileAccess.Read))
  {
    var fileSource = new StreamBufferedSource (stream, new byte[1024]);
    var transform = new System.Security.Cryptography.FromBase64Transform ();
    var aviSource = new CryptoTransformingBufferedSource (fileSource, transform, new byte[1024]);
    ParseAvi (aviSource);
  }
}
void ParseAvi (IBufferedSource source)
{
  do
  {
    source.EnsureBuffer (8);
    var id = System.Text.Encoding.ASCII.GetString (source.Buffer, source.Offset, 4);
    var size = (long)BitConverter.ToUInt32 (source.Buffer, source.Offset + 4);
    source.SkipBuffer (8);
    if (id == "avih")
    {
      var chunkSource = new SizeLimitedBufferedSource (source, size);
      ParseAvihChunk (chunkSource);
    }
    source.TrySkip (size);
  } while (!source.IsEmpty ());
}
void ParseAvihChunk (IBufferedSource source)
{
  source.EnsureBuffer (56);
  var microSecPerFrame = BitConverter.ToUInt32 (source.Buffer, source.Offset);
  var flags = BitConverter.ToUInt32 (source.Buffer, source.Offset + 12);
  var totalFrames = BitConverter.ToUInt32 (source.Buffer, source.Offset + 16);
  var width = BitConverter.ToUInt32 (source.Buffer, source.Offset + 32);
  var height = BitConverter.ToUInt32 (source.Buffer, source.Offset + 36);
}


Не останавливаясь на достигнутом, предлагаю дальнейшее развитие IBufferedSource для источников, представляющих из себя коллекцию блоков информации. Например, файлы AVI или MKV представляют из себя отдельные порции, каждую из которых удобно считать отдельным источником данных. Реализация интерфейса IPartitionedBufferedSource позволит рассматривать единый источник как источник одной порции с возможностью перехода на следующую.

public interface IPartitionedBufferedSource : IBufferedSource
{
// Пытается пропустить все данные источника, принадлежащие текущей части, чтобы стали доступны данные следующей части.
bool TrySkipPart ();
}


Обращаю внимание, что в IPartitionedBufferedSource меняется семантика наследованного свойства IsExhausted. Теперь оно означает исчерпание одной части/порции/блока, что не исключает перехода на следующую.

Сразу представляю две реализации IPartitionedBufferedSource. TemplateSeparatedBufferedSource позволяет считывать источник в виде отдельных частей, которые отделены друг от друга фиксированным разделителем. Например отдельные поля HTTP-заголовка отделены друг от друга двумя байтами 13 и 10. Базовый абстрактный класс EvaluatorPartitionedBufferedSourceBase позволяет путём его наследования определить свою функцию, которая произвольным образом находит границу между частями. Например, я его использовал для считывания отдельных частей в композитном почтовом MIME-сообщении. Обе упомянутые реализации вы найдёте в том же репозитории на github.

Теперь попробуем с использованием всех созданных классов создать разбор довольно сложной структуры: электронного почтового сообщения. Сообщение содержит коллекцию заголовков и коллекцию частей, каждая из которых начинается с коллекции своих заголовков. Обратите внимание, что для реализации разбора разных сущностей получаются совершенно независимые методы, получающие на вход IBufferedSource-источник.

static void Main (string[] args)
{
  using (var fs = new System.IO.FileStream (@"c:\message.eml", FileMode.Open, FileAccess.Read))
  {
    var fileSource = new StreamBufferedSource (fs, new byte[1024]);
    ParseMultipartMessage (fileSource);
  }
}
void ParseMultipartMessage (IBufferedSource source)
{
  var headerSource = new TemplateSeparatedBufferedSource (source, new byte[] { 0x0d, 0x0a, 0x0d, 0x0a });
  var fieldSource = new HeaderFieldSource (headerSource);
  do
  {
    var field = ParseField (fieldSource);
  } while (fieldSource.TrySkipPart ());
  headerSource.TrySkipPart ();
  
  var bodyPartsSource = new BodyPartSource ("boundary--", source);
  while (bodyPartsSource.TrySkipPart ())
  {
    var entity = ParseEntity (bodyPartsSource);
  }
}
void ParseEntity (IBufferedSource source)
{
  var headerSource = new TemplateSeparatedBufferedSource (source, new byte[] { 0x0d, 0x0a, 0x0d, 0x0a });
  var fieldSource = new HeaderFieldSource (headerSource);
  do
  {
    var field = ParseField (fieldSource);
  } while (fieldSource.TrySkipPart ());
  headerSource.TrySkipPart ();
  var body = source.ReadAllBytes ();
}

пояснения к коду
Классы HeaderFieldSource и BodyPartSource являются наследниками EvaluatorPartitionedBufferedSourceBase, потому что использовать TemplateSeparatedBufferedSource невозможно ввиду неоднозначности разделителя отдельных частей. В частности, для заголовка разделителями отдельных полей является не любой перевод строки, а только тот, после которого идут непробельные символы. Для отдельных частей сообщения частю разделителя (указанного в заголовке bounday) могут быть (а могут и не быть) идущие следом переводы строки.


Следующим шагом будет добавление в наш BufferedSource асинхронных методов, но тема эта не простая, поэтому оставлю её для следующих статей.

Подводя итог, позволю себе выразить удовлетворённость результатом. Созданный BufferedSource успешно заменяет и решает все проблемы работы со Stream. При этом переделка существующих компонентов, использующих Stream, может осуществляться постепенно, используя аналогичные методы BufferedSource и классы-переходники.

Позже проект переконфигурирован в виде Portable Class Library Profile328 (.NET Framework 4, Silverlight 5, Windows 8, Windows Phone 8.1, Windows Phone Silverlight 8), добавлены тесты и создан nuget-пакет.
Tags:
Hubs:
Total votes 11: ↑7 and ↓4+3
Comments41

Articles