четверг, 25 сентября 2008 г.

Asynchronous Stream Copying

Не так давно возникла необходимость копирования данных неопределенного размера в из одного потока данных (System.IO.Stream) в другой максимально быстрым образом. Речь шла о передаче больших файлов по локальной сети.

Самый простой способ выполнить требуюмую операцию - в цикле читать кусок данных из source потока и затем писать его в destination поток. При таком подходе время копирования данных складывается из суммарного времени чтения и суммарного времени записи, однако, теоретический нижний предел выполнения такой операции - суммарное время работы с самым медленным потоком.

Как приблизиться к теоретическому пределу? Ответ очевиден: читать очередную порцию данных во время записи предыдущей порции, т.е. выполнять работу по обмену с потоками одновременно. Что для этого потребуется? Управление потоками, объекты синхронизации? Вовсе нет. Задача выполнима в рамках паттерна IAsyncResult.

Далее я представлю метод, который копирует данные из потока в поток до тех пор, пока source поток не вернет 0 байт, но не больше чем указано в параметре count. Метод не очень большой (весь файл занимает около 100 строк вместе с объявлением класса и импортом пространств имен), но я разобью его на логические куски, так чтобы было удобнее комментировать. При желании куски можно будет сложить, и они будут работать.

Итак, объявление метода и проверка параметров:
public static long CopyData(Stream source, Stream destination, long count, int bufferSize)
{
    if (source == null)
        throw new ArgumentNullException("source");

    if (destination == null)
        throw new ArgumentNullException("destination");

    if (count < 0L)
        throw new ArgumentOutOfRangeException("count");

    if (bufferSize <= 0)
        throw new ArgumentOutOfRangeException("bufferSize");

    if(count == 0L)
        return 0L;
Метод возвращает число фактически скопированных байт. Оно может быть не равно параметру count в случае, если source поток достиг конца. Ничего интересного тут пока нет. Комментарии к этому куску опущу. А дальше они понадобятся:
    long totalBytes2Read = count;
  
    byte[] readingBytes = null;

    int bytesRead = 0;
    byte[] readBytes = null;

    int bytes2Write = 0;
    byte[] writingBytes = null;
     
    int bytesWrited = 0;
    byte[] freeBuffer = null;

    long totalWritedBytes = 0L;

    IAsyncResult asyncReadResult = null;
    IAsyncResult asyncWriteResult = null;
В порядке объявления:
  • totalBytes2Read - счетчик байт, которые осталось прочитать из потока source;
  • readingBytes - буфер данных, куда будут читаться данные. По мере прочтения буфер попадет в переменную readBytes;
  • bytesRead - число байт, фактически вчитанных в буфер;
  • readBytes - буфер, ожидающий записи в destination. После начала записи он будет храниться в переменной writingBytes;
  • bytes2Write - число прочитаных байт, но в другой переменной, соответствующей буферу, ожидающему записи;
  • writingBytes и bytesWrited - это записываемые байты и их число (число ранее вчитанных байт);
  • freeBuffer - освободившийся после записи буфер. При необходимости он будет использован заново, а это весьма вероятно;
  • asyncReadResult и asyncWriteResult - это результаты операций BeginRead и BeginWrite соответственно.
Как вы наверное уже догадались, буфер будет проходить циклически через несколько этапов: чтение -> ожидание записи - > запись -> ожидание чтения -> чтение... В тот момент когда буфер прошел этап чтения, переменная может хранить уже другой буфер, в который сразу же начинается чтение. Такие же этапы проходит значение числа прочитанных байт. Оно путешествует по соответствующим полям. Правда, рождается это число только в момент окончания чтения, и его не нужно хранить для начала следующего чтения, потому этапов для этого значения меньше.

Сами границы этапов я оформил анонимными методами. Дело в том, что некоторые из них из управляющего кода вызываются больше одного раза, а оформлять настоящие методы и передавать в них много параметров и результатов мне показалось неуклюжим подходом. Итак, начало и конец чтения:
Action beginRead = () =>
    {
        var size = (int) Math.Min(bufferSize, totalBytes2Read);
        asyncReadResult = source.BeginRead(
            readingBytes = freeBuffer ?? (new byte[size]),
            0,
            size,
            null,
            null);
    };

    Action endRead = () =>
    {
        bytes2Write = bytesRead = source.EndRead(asyncReadResult);
        readBytes = readingBytes;
        totalBytes2Read -= bytesRead;
    };
beginRead первым делом определяет размер данных, которые нужно заказать source потоку. Далее он анализирует наличие свободного буфера и при отсутствии его создает новый. Размер создаваемого буфера может быть меньше, чем указанно в параметре bufferSize, т.к. по достижении конца заказанного в count диапазона полноразмерный буфер не нужен. Далее полученный буфер (свободный, либо вновь созданный) записывается в переменную readingBytes и source потоку заказывается чтение в этот буфер куска данных, равному size.

Результат выполнения операции BeginRead записывается в локальную переменную. endRead завершает операцию чтения у потока source, получает фактический размер прочитанных байт и размазывает его по двум переменным. Хватило бы одной переменной, но у них разные назначения. Далее буфер, в который мы читали становится буфером, ожидающим записи. И в конце декрементируется счетчик оставшихся для чтения байт.
Action beginWrite = () => asyncWriteResult = destination.BeginWrite(
        writingBytes = readBytes,
        0,
        bytesWrited = bytes2Write,
        null,
        null);

    Action endWrite = () =>
    {
        destination.EndWrite(asyncWriteResult);
        freeBuffer = writingBytes;
        totalWritedBytes += bytesWrited;
    };
beginWrite переводит буфер и его размер в следующее состояние (присваивает другим переменным) и начинает операцию чтения. endWrite завершает операцию чтения и освобождает буфер (записывает его в переменную для освобожденного буфера). В заключении инкриментируется число фактически записанных байт.

Заключающий кусок кода - управляющая часть метода:
    beginRead();

    while (totalBytes2Read > 0L)
    {
        endRead();

        if (bytesRead <= 0)
            break;

        if (totalBytes2Read > 0L)
            beginRead();

        if (asyncWriteResult != null)
            endWrite();

        beginWrite();
    }

    endWrite();

    return totalWritedBytes;
}
Начинается управляющий кусок всегда с начала чтения. Дальше, в цикле, отслеживающим счетчик прочитанных байт, первым делом завершаем операцию чтения. Когда мы только вошли в цикл, то чтение только что начато, но пока оно не завершено, делать больше нечего - только ждать конца чтения. Затем анализируется число прочитанных байт. Если 0 - завершаем цикл. Читать больше нечего. Если есть что читать (работа со счетчиками ведется в операции endRead), то начинаем очередное чтение. Далее анализируется поле asyncWriteResult, для того, чтобы понять, начата ли операция записи. На первой итерации цикла оно пусто, т.е. окончание чтения пропускается. Во всех остальных итерациях будет ожидание завершения записи. После завершения записи - открываем новую операцию записи.

Вне тела цикла стоит ожидание записи в выходной поток. Внимательно посмотрев на управляющий цикл, можно заметить, что начала и концы парных операций стоят рядом в порядке где сначала конец операции, потом начало следующей операции. Конец чтения - сразу за ним начало чтения; конец записи и сразу начало записи. Таким образом, для завершения каждой асинхронной операции существует целая итерация цикла, в каждой из которых выполняется две асинхронной операции (чтение и запись). И каждая итерация будет занимать время, требующееся на максимально долгую операцию (чтения либо записи).

Для тестирования производительности я использовал производный от MemoryStream класс, который перекрывая соответствующие методы, регулировал время обращения к нему с помощью метода Thread.Sleep(int). Действительно, при больших объемах данных и малых размерах буфера время работы предложенного мной метода стремится к теоретическому пределу, а именно - к времени работы самого медленного потока. Даже при случайном распределении величины искувственной задержки для потоков, время копирования существенно меньше суммы времен работы с каждым из потоков.

Аналогичным способом можно оформить конвертирование файлов, либо другие асинхронные операции, выполняемые циклически. P.S. Надеюсь, что кому-нибудь пригодится данный подход. Мне кажется, что он довольно изящный )))

22 комментария:

  1. Как насчет обработки исключений? мне кажется, они у Вас не в том потоке будут бросаться :)

    ОтветитьУдалить
  2. meowtch>Как насчет обработки исключений? мне кажется, они у Вас не в том потоке будут бросаться :)
    Как-раз в том самом! Методы BeginRead/BeginWrite/EndRead/EndWrite вызываются из управляющего потока, и исключения будут бросаться именно в управляющем потоке.

    ОтветитьУдалить
  3. Очень не хватает в таком методе параметра CancellationToken ;о)

    ОтветитьУдалить
  4. _FRED_>Очень не хватает в таком методе параметра CancellationToken ;о)

    Мне кажется что это очень легко исправить.

    ОтветитьУдалить
  5. Маленький коммент - глагол to read является неправильным и во всех формах выглядит одинаково - read, просто читается по-разному :) Очень часто вижу в коде readed :))

    ОтветитьУдалить
  6. Большое спасибо за замечание, поправлю в ближайшее время.

    ОтветитьУдалить
  7. Скорость чтения с HDD - мелочь в сравнении с пропускной способностью сети.
    Я, например, сжимал файлы(не все естественно) перед отправкой. Передача папки с мелкими файлами было на порядок быстрее.

    ОтветитьУдалить
  8. Зависит от сети. Сталкивался со случаем, когда сосал именно HDD.
    Однако задумка именно в том, чтобы можно было комбинировать некоторое количество этапов передачи данных (даже со сжатием и расжатием) и получать время передачи устремляющееся к работе самого медленного этапа, а не суммы всех этапов.

    ОтветитьУдалить
  9. Хочу снова поднять тему обработки исключений.

    В этом месте

    if (totalBytes2Read > 0L)
    beginRead(); // 1

    if (asyncWriteResult != null)
    endWrite(); // 2

    если исключение произойдет в момент записи (2), никто не сделает EndRead для уже запущенной (1) операции чтения.

    То же самое в случае ошибки чтения на втором этапе. Кто сделает EndWrite уже стартовавшей операции записи?

    ОтветитьУдалить
  10. Приведу полностью чтобы было понятнее:

    while (totalBytes2Read > 0L)
    {
    endRead(); // 1

    if (bytesReaded <= 0)
    break;

    if (totalBytes2Read > 0L)
    beginRead(); // 2

    if (asyncWriteResult != null)
    endWrite(); // 3

    beginWrite(); //4
    }

    2 -> 3 -> exception = потерян вызов EndRead
    4 -> 1 -> exception = потерян вызов EndWrite

    ОтветитьУдалить
  11. Анонимный комментирует>Приведу полностью чтобы было понятнее

    Да все понятно. Все верно, т.е. в коде лажа. Благодарю за бдительность.
    Надо подумать, как бы так поправить, что бы в код не погружаться заново...

    ОтветитьУдалить
  12. Не думали пока? :)

    з.ы. в принципе .net 4 и Rx можно было задействовать...

    ОтветитьУдалить
  13. Анонимный комментирует...>Не думали пока? :)
    Думал немного. Проблема еще в том, что нам не просто надо вызвать недостающий EndXXX, а и обеспечить такое поведение, что исключение недостающего EndXXX не должно перетереть первичное исключение из EndYYY.
    Допустим, EndRead выбросил исключение. Нам надо вызвать EndWrite (если был BeginWrite) и проигнорировать его исключение, выбросив наружу исключение от EndRead. И наоборот.
    Обеспечить такое для EndRead и EndWrite одновременно не получается с помощью добавления соответствующих блоков try/catch в основной цикл. Выходит, что надо выносить эту логику куда-то (например, в классы с IDisposable). А это уже значительная переработка кода, да и поста в целом.

    Анонимный комментирует...>з.ы. в принципе .net 4 и Rx можно было задействовать...
    Да, было бы любопытно. Но это совершенно другая история.
    В принципе, в .Net 4 есть встроенный метод копирования потоков. Но он копирует выполняя синхронные вызовы последовательно.


    Разрешите поинтересоваться, данный метод копирования актуален для вас, или представляет чисто академический интерес?

    ОтветитьУдалить
  14. По первой части: да я все это понимаю, потому и не берусь сам что-либо менять, потому как хз как правильно должно быть.

    А .net4 я упомянул потому как там есть AggrgateException со списком исключений... на первый взгляд то, что надо.

    Нет, не академический. Хотел честно скомуниздить к себе в проект, но т.к. он "серверный" и ошибки увидел, то перетаптываюсь Stream.Copy.

    Вообще параллельное копирование очень было бы полезно: в моем случае например получаю файло по сети, считаю хэш и жму в файловый поток. Ну вот сходу параллелим чтение по сети + хэширование и сжатие с записью на винт. Имхо затея имеет смысл.

    ОтветитьУдалить
  15. p.s. AggregateException в связке с Task упомянул.
    p.s.1. могу вот подсказку кинуть http://blogs.msdn.com/b/pfxteam/archive/2009/06/30/9809774.aspx - там ближе к концу самое интересное.
    но под свои нужды переделать как ума не приложу

    ОтветитьУдалить
  16. Анонимный комментирует...>
    По первой части: да я все это понимаю, потому и не берусь сам что-либо менять, потому как хз как правильно должно быть.

    Я думаю что понимаю, как должно быть, но вот преобразовать код к этому пониманию - это сложнее.

    А .net4 я упомянул потому как там есть AggrgateException со списком исключений... на первый взгляд то, что надо.

    AggregateException довольно сильно усложняет обработку исключений, в частности catch по типу исключения. Приходится матчить руками...

    Нет, не академический. Хотел честно скомуниздить к себе в проект, но т.к. он "серверный" и ошибки увидел, то перетаптываюсь Stream.Copy.

    Для серверного варианта мой код не очень подходит. Дело в том, что основной цикл держит поток пока не закончится копирование. Притом что используются асинхронные вызовы, управляющий поток все время курит в ожидании завершения очередного блокирующего EndXXX. Был у меня еще вариант на Callback-ах, но он отличался лишь тем, что курил не поток управляющего цикла, а потоки пула.

    ОтветитьУдалить
  17. Анонимный комментирует...>
    p.s.1. могу вот подсказку кинуть http://blogs.msdn.com/b/pfxteam/archive/2009/06/30/9809774.aspx - там ближе к концу самое интересное.
    но под свои нужды переделать как ума не приложу


    После беглого просмотра понял, что подобным свойством (одновременное чтение и запись) обладает лишь последний пример. Остальные используют лишь один буфер для чтения и записи, следовательно могут выполнять эти операции лишь последовательно.

    А какие проблемы с последним примером по ссылке? Он, как я понимаю, и не блокирует потоки при ожидании сразу двух EndXXX.

    ОтветитьУдалить
  18. По ссылке не устраивает отсутствие возможности получения количества скопированных байтов. И я сходу не смог переделать, слишком плохо знаю все эти нововведения .net4 на практических примерах.

    А то что ваш код держит поток, так это не страшно, т.к. я использую wcf где операции служб в любом случае держат поток до конца выполнения своей работы (ну нужно же вернуть ответ).

    ОтветитьУдалить
  19. Анонимный комментирует...>

    По ссылке не устраивает отсутствие возможности получения количества скопированных байтов. И я сходу не смог переделать, слишком плохо знаю все эти нововведения .net4 на практических примерах.


    Я его сходу скомпилировать-то не смог. Изменились TaskContinuationOptions, пропали методы вроде IngoreExceptions, изменились сигнатуры методов...

    Но направление для получения количества скопированных байт подсказать смогу. Нужно Iterate педелеать так, что бы она работала не с Task-ами, а с Task<TInput> и Task<TOutput>-ами. Плюс допилить ее что бы она накапливала результат в точности как Enumerable.Aggregate, с помощью указанной функции (и для копирования потоков она должна будет накапливать результаты в long-е, а не в int-е).

    Далее, CopyStreamToStreamAsync должен возвращать IEnumerable<Task<int>>. Результат input.ReadTask подойдет как есть, а результат Task.Factory.ContinuesWhenAll нужно преобразовать к Task<int>, возвращающий 0 в качестве результата.

    Если бы суть изменений заключалась лишь в этом, я бы быстро набросал код. Но придется вникать в тонкости работы с Task-ами, вкуривать смысл изменений фреймворка с момента публикации поста, потому быстро не получится.

    Да, все-таки тот пример из блога не такой, как у меня. Перед формированием очередного запроса на чтение тот пример будет ждать окончания и чтения и записи. Мой сделает запрос на чтение не дожидаясь окончания записи, потому будет меньше простаивать в случае когда чтение быстрее записи.

    Я еще подумаю над тем, как исправить мой код (только фоном, не в приоритете). Еще раз спасибо за бдительность.

    ОтветитьУдалить
  20. Результат input.ReadTask подойдет как есть, а результат Task.Factory.ContinuesWhenAll нужно преобразовать к Task, возвращающий число число прочитанных байт, взятых из readTask.

    ОтветитьУдалить
  21. Вот набросок. Форсировать и ловить исключения не пробовал.

    ОтветитьУдалить