воскресенье, 16 ноября 2008 г.

Хранилище записей фиксированной длины (часть III)

Хранилище записей фиксированной длины Хранилище записей фиксированной длины (часть II) Извиняюсь, что затянул с продолжением, совершенно не было времени и сил, особенно после праздников. Напомню, что в предыдущих постах была описана реализация хранилища записей фиксированной длины, выполненного в качестве обертки над Stream-ом с прямым доступом к полям записей. В этот раз опишу как прикрутить к нему простой индекс.
Предварительно хочу сделать акцент на том, что я не пытался создать библиотеку для решения общего случая, а решал конкретную задачу, но как можно более общим образом. В той конкретной задаче, которую я решал, достаточно индекса по одному полю, который бы искал записи по точному совпадению ключа. Уот таким я увидел интерфейс индекса хранилища:
public interface IStorageIndex<TKey>
{
    void Initialize(IEnumerable<TKey> keys, Func<int, TKey> readKeyFunc);

    int GetIndex(TKey key);

    void InsertIndex(TKey key, int rowIndex);
}
Метод инициализации индекса принимает последовательность всех ключей (забегая вперед отмечу, что ленивую последовательность), и метод получения ключа по заданному индексу. Полагаю, что такой метод инициализации должен удовлетворить некоторое кол-во реализаций индексов (строящихся в памяти, или на диске). Метод GetIndex - собственно то, ради чего и задумывался индекс. Он должен вернуть индекс записи с указанным ключем. Метод InsertIndex добавляет в индекс информацию о положении новой записи. Простоты ради я избавился от метода удаления записи из индекса. Простейшая реализация индекса выглядит так (комментировать не буду):
public class DictionaryIndex<TKey> : IStorageIndex<TKey>
{
    private readonly Dictionary<TKey, int> _indices = new Dictionary<TKey, int>();

    public void Initialize(IEnumerable<TKey> keys, Func<int, TKey> readKeyFunc)
    {
        int index = 0;
        foreach(TKey key in keys)
        {
            _indices[key] = index++;
        }
    }

    public int GetIndex(TKey key)
    {
        int result;
        return _indices.TryGetValue(key, out result)
            ? result
            : -1;
    }

    public void InsertIndex(TKey key, int rowIndex)
    {
        _indices.Add(key, rowIndex);
    }
}
Кстати, именно подобный индекс основанный на словаре успешно трудится уже около двух месяцев в некоторых заведениях ;) страны. Теперь опишу базовый класс индексированного хранилища записей. Для индесированного по одной колонке хранилища нам потребуется еще один generic параметр TKey, который будет представлять тип колонки, по которой проводится индексирование. Конструктор индексированного хранилища принимает дополнительные параметры - колонку, по которой проводится индексирование и реализацию индекса:
public class IndexedStorage<THeader, TRow, TKey> : StreamStorage<THeader, TRow>
    where THeader : RowBase<THeader>
    where TRow : RowBase<TRow>
{
    private readonly Column<TKey> _keyColumn;
    private readonly IStorageIndex<TKey> _index;

    protected IndexedStorage(Stream stream, Encoding encoding, Column<TKey> keyColumn, IStorageIndex<TKey> index)
        : base(stream, encoding)
    {
        if(index == null)
        {
            throw new ArgumentNullException("index");
        }
        if(keyColumn == null)
        {
            throw new ArgumentNullException("keyColumn");
        }
   
        if(keyColumn.Columns != RowBase<TRow>.ColumnCollection)
        {
            throw new ArgumentException();
        }
        _keyColumn = keyColumn;
        _index = index;
    }
    ...
}
Методы, потребующиеся для инициализации индекса (чтение ключа записи по указанному номеру и чтение последовательности ключей всех записей)
private TKey ReadKey(int index)
{
    return ReadValue(_keyColumn, index);
}

private IEnumerable<TKey> LoadKeys()
{
    int length = Length;

    using (var notOwnedStream = new NotOwnedStreamProxy(Stream))
    using (var bufferedStream = new BufferedStream(notOwnedStream))
    using (var reader = new BinaryReader(bufferedStream))
    {                   
        byte[] buffer =
            new byte[RowBase<TRow>.ColumnCollection.Size - _keyColumn.Size];
         bufferedStream.Position = RowBase<THeader>.ColumnCollection.Size + _keyColumn.Offset;

        for (int i = 0; i < length; i++)
        {
            yield return _keyColumn.ReadValue(reader);
         
            if (i < length - 1)
            {
                reader.ReadBytes(buffer.Length);
            }
        }
    }
}
второй метод потребует комментариев. Дело в том, что я его изуродовал, пытаясь выжать производительность. Самая простая его реализация - вызов в цикле вышеописанного метода ReadKey(int index) безобразно долго работает. Дело в том, что при таком подходе потребуется большое (по числу записей) количество операций Seek. Хоть изменения позиции и небольшие, но сама операция на FileStream (а это основной сценарий использования этого хранилища) все равно занимает значительное время. Вместо операции Seek я решил использовать чтение в буфер, размер которого равен размеру записи без ключевого поля. Т.е. прочитав первый ключ и вчитав в этот буфер дальнейшее содержимое, позиция потока оказывается ровно перед ключем следующего поля. Еще одна оптимизация - использование буферизованного потока дает небольшое преимущество перед чтением напрямую. Небольшое, но я решил им все же воспользоваться. Потому потребовался другой экземпляр BinaryReader. Вся эта оптимизация привела к следующей проблеме: при освобождении BufferedStream, равно как и BinaryReader-а для чтения из буферизованного стрима, освобождается несущий стрим хранилища. Решений может быть несколько: постараться не освобождать эти сущности (хранить их в полях класса), либо оградить несущий стрим от деструктивных воздействий вспомогательных сущностей. Я предпочел второе, хоть и пришлось много постучать по кнопкам: пришлось реализовать хитрый proxy для класса Stream, который делегирует все методы и свойства низлежащему стриму, за исключением метода Close(). Этот класс я худобедно назвал NotOwnedStreamProxy. Полные его исходники приводить не буду, ограничусь выбранными наугад методами (остальные выглядят похожим образом):
public override void Close()
{
    // BaseStream.Close(); ВСЕ РАДИ ТОГО, ЧТОБЫ НЕ ДОПУСТИТЬ ВЫЗОВ ЭТОГО!!!
    base.Close();
}

public override int EndRead(IAsyncResult asyncResult)
{
    return BaseStream.EndRead(asyncResult);
}

public override void EndWrite(IAsyncResult asyncResult)
{
    BaseStream.EndWrite(asyncResult);
}

public override int ReadByte()
{
    return BaseStream.ReadByte();
}
И не говорите! Сам не в восторге. Метод инициализации базового индексированного хранилища теперь выглядит так:
protected override void Initialize()
{
    lock (SyncRoot)
    {
        base.Initialize();

        _index.Initialize(LoadKeys(), ReadKey);
    }
}
Еще пара нехитрых методов базового индексированного хранилища:
protected int GetIndex(TKey key)
{
    lock(SyncRoot)
    {
        return _index.GetIndex(key);
    }
}

protected TRow InsertRow(TKey key)
{
    int newRowIndex;
    lock(SyncRoot)
    {
        newRowIndex = Length;

        _index.InsertIndex(key, newRowIndex);

        Length += 1;
        WriteValue(_keyColumn, newRowIndex, key);
    }

    return GetRow(newRowIndex);
}
Осталось чуть-чуть. Уже добрались до конкретного индексированного хранилища:
class IndexedStorage : IndexedStorage<TestHeader, TestRow, Guid>
{
    public IndexedStorage(Stream stream)
        : base(
            stream,
            Encoding.Default,
            TestRow.IdColumn,
            new DictionaryIndex<Guid>())
    {
        base.Initialize();
    }

    public TestRow AddNewRow(Guid id)
    {
        return InsertRow(id);
    }

    public TestRow GetRow(Guid id)
    {
        return GetRow(GetIndex(id));
    }
}
Класс TestRow описан в конце первого поста о хранилище записей.
В следующий раз приведу результаты стресс-тестирования этого безобразия в сравнении с SQLite.

вторник, 4 ноября 2008 г.

Хранилище записей фиксированной длины (часть II)

В предыдущем посте (Хранилище записей фиксированной длины) было описано все необходимое для класса хранилища. Теперь можно приступать к реализации хранилища.

Напомню: хранилище - надстройка над классом Stream для хранения типизированных записей фиксированной длины и некого заголовка. Класс хранилища параметризован двумя generic параметрами: типом заголовка хранилища и типом записи хранилища:
public class StreamStorage<THeader, TRow> : StreamStorage, IDisposable
    where THeader : RowBase<THeader>
    where TRow : RowBase<TRow>
{
    private readonly object _syncRoot = new object();
    private readonly Stream _stream;
    private readonly BinaryWriter _writer;
    private readonly BinaryReader _reader;
    private int _length;

    protected StreamStorage(
        Stream stream,
        Encoding encoding)
    {
        _stream = stream;
        encoding = encoding ?? Encoding.Default;

        if (stream.CanWrite)
        {
            _writer = new BinaryWriter(_stream, encoding);
        }
        if (stream.CanRead)
        {
            _reader = new BinaryReader(_stream, encoding);
        }
    }

    public void Dispose()
    {
        if (_writer != null)
        {
            _writer.Close();
        }
        if(_reader != null)
        {
            _reader.Close();
        }
        _stream.Close();
    }
Конструктор хранилища принимает поток данных, в котором хрянятся записи и кодировку для записи строковых полей (реализацию строковых колонок я пока не приводил). Метод Dispose() освобождает ресурсы. Следом короткая форма доступа к экземплярам коллекций колонок заголовка и записи хранилища:
static ColumnCollection HeaderColumns
    {
        get { return RowBase<THeader>.ColumnCollection; }
    }

    static ColumnCollection RowColumns
    {
        get { return RowBase<TRow>.ColumnCollection; }
    }
Затем доступ к ридеру, райтеру и собственно потоку данных:
protected BinaryWriter Writer
    {
        get
        {
            if (_writer == null)
            {
                throw new NotSupportedException();
            }
            return _writer;
        }
    }

    protected BinaryReader Reader
    {
        get
        {
            if (_reader == null)
            {
                throw new NotSupportedException();
            }
            return _reader;
        }
    }

    protected Stream Stream
    {
        get { return _stream; }
    }

    protected object SyncRoot { get { return _syncRoot; } }
Далее серия методов, производящих строки. Есть небольшие проблемы в том каким образом создавать экземпляры TRow и THeader, ведь экземпляры строк требуют экземпляр хранилища и свой индекс при создании, т.е. мы не можем воспользоваться их конструктором по умолчанию.

Можно было параметризовать хранилище фабрикой строк и фабрикой заголовка, но я пришел к решению предусмотреть расширение через Template Method.
protected virtual TRow CreateRowInstance(int index)
    {
        return CreateRowInstance<TRow>(this, index);
    }

    protected virtual THeader CreateHeaderInstance()
    {
        return CreateRowInstance<THeader>(this, 0);
    }

    private static T CreateRowInstance<T>(StreamStorage storage, int index)
        where T : RowBase<T>
    {
        return (T)Activator.CreateInstance(typeof(T), storage, index);
    }

    protected TRow GetRow(int index)
    {
        lock (SyncRoot)
        {
            return CreateRowInstance(index);
        }
    }

    public THeader Header { get; private set; }
Такое решение позволит без дополнительных усилий создавать экземпляры строк, которые принимают в конструкторе экземпляр хранилища и индекс. Пример строки TestRow из предыдущего поста принимает именно такой набор параметров. Если потребуется изменить сигнатуру конструктора записи, то производный класс хранилища сможет перекрыть методы CreateRowInstance и CreateHeaderInstance.

Далее способ управления размером хранилища:
protected int Length
    {
        get
        {
            return _length;
        }
        set
        {
            if (value < 0)
            {
                throw new ArgumentOutOfRangeException("value");
            }

            long streamLength = HeaderColumns.Size + RowColumns.Size * value;

            lock (SyncRoot)
            {
                Stream.SetLength(streamLength);

                _length = value;
            }
        }
    }
Реализация свойства Length практически не нуждается в комментариях. Она устанавливает длину потока данных руководствуясь размером заголовка, размером записи и требуемым кол-вом записей. Метод установки позиции потока для чтения или записи значения:
private void SetPosition(int rowIndex, Column column)
    {
        if (column == null)
        {
            throw new ArgumentNullException("column");
        }

        int pos;
        if (ReferenceEquals(column.Columns, HeaderColumns))
        {
            pos = column.Offset;
        }
        else
        {
            if (ReferenceEquals(column.Columns, RowColumns))
            {
                if (rowIndex < 0 || rowIndex >= Length)
                {
                    throw new ArgumentOutOfRangeException("rowIndex");
                }
                pos = HeaderColumns.Size + RowColumns.Size * rowIndex + column.Offset;
            }
            else
            {
                throw new ArgumentException("", "column");
            }
        }

        if (Stream.Position != pos)
        {
            Stream.Position = pos;
        }
    }
Написано много, но смысл простой: если колонка относится к заголовку, то установить позицию потока к смещению колонки. Если колонка относится к типу записи - установить позицию потока руководствуясь размером заголовка, размером записи, индексом записи и смещением колонки.

Следует заметить, что этот метод не будет работать адекватно при идентичных типах TRow и THeader. Проверку на такое совпадение я опустил. Если потребуется сделать что-то подобное, то лучше указать тип заголовка без колонок.

Теперь сердце хранилища - методы чтения и записи значений:
public override T ReadValue<T>(Column<T> column, int rowIndex)
    {
        if (column == null)
        {
            throw new ArgumentNullException("column");
        }
        lock (SyncRoot)
        {
            SetPosition(rowIndex, column);
            return column.ReadValue(Reader);
        }
    }

    public override void WriteValue<T>(Column<T> column, int rowIndex, T value)
    {
        if (column == null)
        {
            throw new ArgumentNullException("column");
        }
        lock (SyncRoot)
        {
            SetPosition(rowIndex, column);
            column.WriteValue(Writer, value);
        }
    }
Осталось всего ничего - код инициализации хранилища:
protected virtual void Initialize()
    {
        Header = CreateHeaderInstance();

        if (RowColumns.Size == 0)
        {
            CreateRowInstance(0); // вызвать инициализацию колонок записи.
        }

        int headerSize = HeaderColumns.Size;
        if (Stream.Length == 0)
        {
            Stream.SetLength(headerSize);
            Header.Initialize();
        }
        else
        {
            _length = (int)((Stream.Length - headerSize) / RowColumns.Size);

            if (headerSize + Length * RowColumns.Size != Stream.Length)
            {
                throw new InvalidOperationException("Unexpected stream length.");
            }
        }
    }
Сам по себе этот код не сложен, но тащит за собой одну проблему: кто-то его должен вызывать. Но это не может быть конструктор хранилища, потому как метод инициализации использует виртуальные методы. В отличие от C++, в C# виртуальные методы могут быть вызваны из конструктора базового класса, но существует опасность что перекрытые методы могут использовать поля производного класса, которые еще не будут проинициализированы. Потому из конструктора StreamStorage этот метод я вызывать не стал. Не нравится мне так же решение с отложенной инициализацией, где пришлось бы чуть ли не во всех методах вставлять проверку на наличие инициализации. Отказ от Template Method в пользу параметризации хранилища фабриками строк тоже не идеальное решение. Ведь одно из очевидных решений - реализация фабрик в классе произвольного хранилища. Но тогда будут сложности с передачей таких фабрик через конструктор класса StreamStorage.

 Хорошим решением этой проблемы стало бы решение сделать класс StreamStorage не расширяемым (sealed), опубликовать методы GetRow(int index), свойство Length и при необходимости расширения использовать композицию... Но, при реализации индексированного хранилища потребуется доступ к BinaryReader, BinaryWriter, Stream-у и даже к корню синхронизации. Не хотелось бы делать эти вещи общедоступными, потому было принято решение расширять хранилище через наследование (вариант понапихать всю необходимую функциональность в один класс кажется мне скверным).

Итого обязанность вызова метода Initialize назначается на классы, расширяющие StreamStorage. Резюме: в итоге этого поста завершена реализация хранилища записей с фиксированной длинной, с доступом к записям по индексу, с потокобезопасным доступом к полям записей. Для того, чтобы воспользоваться хранилищем, нужно определить два типа записей (один для заголовка) примерно как TestRow в предыдущем посте, определить свой класс хранилища, унаследовавшись от StreamStorage<THeader, TRow> и предоставить доступ к методам GetRow(int index) и способу управления размером хранилища.

При необходимости производное хранилище может реализовать интерфейс списка (IList), организовать кэширование записей и вести себя как полноценная коллекция. Напомню, что методы ReadValue и WriteValue у хранилища чисто вспомогательные, что они нужны для базового класса RowBase, и что обращение к полям по задумке должно происходить через свойства класса записи, например как в TestRow из предыдущего поста. Впрочем, методы ReadValue и WriteValue не мешают, а даже немного дополняют функциональность хранилища возможностью получать доступ к значениям не создавая экземпляры записей, потому я не стал предпринимать попыток скрыть эти методы.

В следующем посте расскажу о способах индексации этого хранилища.