четверг, 25 сентября 2008 г.

Asynchronous Stream Copying

Не так давно возникла необходимость копирования данных неопределенного размера в из одного потока данных (System.IO.Stream) в другой максимально быстрым образом. Речь шла о передаче больших файлов по локальной сети.

Самый простой способ выполнить требуюмую операцию - в цикле читать кусок данных из source потока и затем писать его в destination поток. При таком подходе время копирования данных складывается из суммарного времени чтения и суммарного времени записи, однако, теоретический нижний предел выполнения такой операции - суммарное время работы с самым медленным потоком.

Как приблизиться к теоретическому пределу? Ответ очевиден: читать очередную порцию данных во время записи предыдущей порции, т.е. выполнять работу по обмену с потоками одновременно. Что для этого потребуется? Управление потоками, объекты синхронизации? Вовсе нет. Задача выполнима в рамках паттерна IAsyncResult.

Далее я представлю метод, который копирует данные из потока в поток до тех пор, пока source поток не вернет 0 байт, но не больше чем указано в параметре count. Метод не очень большой (весь файл занимает около 100 строк вместе с объявлением класса и импортом пространств имен), но я разобью его на логические куски, так чтобы было удобнее комментировать. При желании куски можно будет сложить, и они будут работать.

Итак, объявление метода и проверка параметров:
public static long CopyData(Stream source, Stream destination, long count, int bufferSize)
{
    if (source == null)
        throw new ArgumentNullException("source");

    if (destination == null)
        throw new ArgumentNullException("destination");

    if (count < 0L)
        throw new ArgumentOutOfRangeException("count");

    if (bufferSize <= 0)
        throw new ArgumentOutOfRangeException("bufferSize");

    if(count == 0L)
        return 0L;
Метод возвращает число фактически скопированных байт. Оно может быть не равно параметру count в случае, если source поток достиг конца. Ничего интересного тут пока нет. Комментарии к этому куску опущу. А дальше они понадобятся:
    long totalBytes2Read = count;
  
    byte[] readingBytes = null;

    int bytesRead = 0;
    byte[] readBytes = null;

    int bytes2Write = 0;
    byte[] writingBytes = null;
     
    int bytesWrited = 0;
    byte[] freeBuffer = null;

    long totalWritedBytes = 0L;

    IAsyncResult asyncReadResult = null;
    IAsyncResult asyncWriteResult = null;
В порядке объявления:
  • totalBytes2Read - счетчик байт, которые осталось прочитать из потока source;
  • readingBytes - буфер данных, куда будут читаться данные. По мере прочтения буфер попадет в переменную readBytes;
  • bytesRead - число байт, фактически вчитанных в буфер;
  • readBytes - буфер, ожидающий записи в destination. После начала записи он будет храниться в переменной writingBytes;
  • bytes2Write - число прочитаных байт, но в другой переменной, соответствующей буферу, ожидающему записи;
  • writingBytes и bytesWrited - это записываемые байты и их число (число ранее вчитанных байт);
  • freeBuffer - освободившийся после записи буфер. При необходимости он будет использован заново, а это весьма вероятно;
  • asyncReadResult и asyncWriteResult - это результаты операций BeginRead и BeginWrite соответственно.
Как вы наверное уже догадались, буфер будет проходить циклически через несколько этапов: чтение -> ожидание записи - > запись -> ожидание чтения -> чтение... В тот момент когда буфер прошел этап чтения, переменная может хранить уже другой буфер, в который сразу же начинается чтение. Такие же этапы проходит значение числа прочитанных байт. Оно путешествует по соответствующим полям. Правда, рождается это число только в момент окончания чтения, и его не нужно хранить для начала следующего чтения, потому этапов для этого значения меньше.

Сами границы этапов я оформил анонимными методами. Дело в том, что некоторые из них из управляющего кода вызываются больше одного раза, а оформлять настоящие методы и передавать в них много параметров и результатов мне показалось неуклюжим подходом. Итак, начало и конец чтения:
Action beginRead = () =>
    {
        var size = (int) Math.Min(bufferSize, totalBytes2Read);
        asyncReadResult = source.BeginRead(
            readingBytes = freeBuffer ?? (new byte[size]),
            0,
            size,
            null,
            null);
    };

    Action endRead = () =>
    {
        bytes2Write = bytesRead = source.EndRead(asyncReadResult);
        readBytes = readingBytes;
        totalBytes2Read -= bytesRead;
    };
beginRead первым делом определяет размер данных, которые нужно заказать source потоку. Далее он анализирует наличие свободного буфера и при отсутствии его создает новый. Размер создаваемого буфера может быть меньше, чем указанно в параметре bufferSize, т.к. по достижении конца заказанного в count диапазона полноразмерный буфер не нужен. Далее полученный буфер (свободный, либо вновь созданный) записывается в переменную readingBytes и source потоку заказывается чтение в этот буфер куска данных, равному size.

Результат выполнения операции BeginRead записывается в локальную переменную. endRead завершает операцию чтения у потока source, получает фактический размер прочитанных байт и размазывает его по двум переменным. Хватило бы одной переменной, но у них разные назначения. Далее буфер, в который мы читали становится буфером, ожидающим записи. И в конце декрементируется счетчик оставшихся для чтения байт.
Action beginWrite = () => asyncWriteResult = destination.BeginWrite(
        writingBytes = readBytes,
        0,
        bytesWrited = bytes2Write,
        null,
        null);

    Action endWrite = () =>
    {
        destination.EndWrite(asyncWriteResult);
        freeBuffer = writingBytes;
        totalWritedBytes += bytesWrited;
    };
beginWrite переводит буфер и его размер в следующее состояние (присваивает другим переменным) и начинает операцию чтения. endWrite завершает операцию чтения и освобождает буфер (записывает его в переменную для освобожденного буфера). В заключении инкриментируется число фактически записанных байт.

Заключающий кусок кода - управляющая часть метода:
    beginRead();

    while (totalBytes2Read > 0L)
    {
        endRead();

        if (bytesRead <= 0)
            break;

        if (totalBytes2Read > 0L)
            beginRead();

        if (asyncWriteResult != null)
            endWrite();

        beginWrite();
    }

    endWrite();

    return totalWritedBytes;
}
Начинается управляющий кусок всегда с начала чтения. Дальше, в цикле, отслеживающим счетчик прочитанных байт, первым делом завершаем операцию чтения. Когда мы только вошли в цикл, то чтение только что начато, но пока оно не завершено, делать больше нечего - только ждать конца чтения. Затем анализируется число прочитанных байт. Если 0 - завершаем цикл. Читать больше нечего. Если есть что читать (работа со счетчиками ведется в операции endRead), то начинаем очередное чтение. Далее анализируется поле asyncWriteResult, для того, чтобы понять, начата ли операция записи. На первой итерации цикла оно пусто, т.е. окончание чтения пропускается. Во всех остальных итерациях будет ожидание завершения записи. После завершения записи - открываем новую операцию записи.

Вне тела цикла стоит ожидание записи в выходной поток. Внимательно посмотрев на управляющий цикл, можно заметить, что начала и концы парных операций стоят рядом в порядке где сначала конец операции, потом начало следующей операции. Конец чтения - сразу за ним начало чтения; конец записи и сразу начало записи. Таким образом, для завершения каждой асинхронной операции существует целая итерация цикла, в каждой из которых выполняется две асинхронной операции (чтение и запись). И каждая итерация будет занимать время, требующееся на максимально долгую операцию (чтения либо записи).

Для тестирования производительности я использовал производный от MemoryStream класс, который перекрывая соответствующие методы, регулировал время обращения к нему с помощью метода Thread.Sleep(int). Действительно, при больших объемах данных и малых размерах буфера время работы предложенного мной метода стремится к теоретическому пределу, а именно - к времени работы самого медленного потока. Даже при случайном распределении величины искувственной задержки для потоков, время копирования существенно меньше суммы времен работы с каждым из потоков.

Аналогичным способом можно оформить конвертирование файлов, либо другие асинхронные операции, выполняемые циклически. P.S. Надеюсь, что кому-нибудь пригодится данный подход. Мне кажется, что он довольно изящный )))

вторник, 23 сентября 2008 г.

Readonly Auto-Implemented Properties

Вчера расстроился по поводу реализации auto-implemented properties в C# 3.0. Началось все с того, что прочитав документацию по этому вопросу довольно продолжительное время назад, и встретив там цитату
To create a readonly auto-implemented property, give it a private set accessor.
считал, что auto-implemented readonly свойства имеют отношение к ключевому слову readonly. Вот цитата из описания этого ключевого слова:
The readonly keyword is a modifier that you can use on fields. When a field declaration includes a readonly modifier, assignments to the fields introduced by the declaration can only occur as part of the declaration or in a constructor in the same class.
Именно на эту статью в документации ведет гиперссылка из статьи про auto-implemented свойства. Воображение рисовало такую реализацию этого нововведения компилятором, где бы код
class Foo
{
  public int MyProperty { get; private set; }

  public Foo(int p)
  {
      MyProperty = p;
  }
}
соответствовал бы следующей реализации:
class Foo
{
  private readonly int _autoGeneratedReadonlyField;

  public Foo(int p)
  {
      autoGeneratedReadonlyField = p;
  }

  public int MyProperty
  {
      get { return _autoGeneratedReadonlyField; }
  }
}
Таким образом, такая реализация auto-implemented свойства с private set аксессором, максимально соответствовала бы документации. Так получилось, что довольно долго я считал, что readonly auto-implemented свойства нельзя модифицировать нигде, кроме конструктора класса (либо статического конструктора, если свойство статическое). Оказалось, что private set не генерит ничего кроме private аксессора к свойству, обращение к которому разрешено из кода любого метода или свойства класса, либо вложенных классов.

Итого, использование readonly auto-implemented свойств не имеет никакого отношения к readonly полям класса. Зачем же тогда в документации стоит эта ссылка на описание ключевого слова readonly? Думаю, что кроме меня есть еще жертвы дезинформации.

Вообще говоря, поведение private set модификатора auto-implemented свойства выбрано правильно. И такое поведение нужно, но кроме этого нужна адекватная документация. Все же, эмуляция readonly полей нужна. И на форуме в RSDN было предложено несколько вариантов синтаксиса, которые могли бы быть реализованы. Например,
public readonly string MyString { get; }

public int Property { get; readonly set; }

public int Property { readonly; }
Удобен был бы способ инициализации таких свойств при объявлении (по аналогии с полями).