воскресенье, 16 ноября 2008 г.

Хранилище записей фиксированной длины (часть III)

Хранилище записей фиксированной длины Хранилище записей фиксированной длины (часть II) Извиняюсь, что затянул с продолжением, совершенно не было времени и сил, особенно после праздников. Напомню, что в предыдущих постах была описана реализация хранилища записей фиксированной длины, выполненного в качестве обертки над Stream-ом с прямым доступом к полям записей. В этот раз опишу как прикрутить к нему простой индекс.
Предварительно хочу сделать акцент на том, что я не пытался создать библиотеку для решения общего случая, а решал конкретную задачу, но как можно более общим образом. В той конкретной задаче, которую я решал, достаточно индекса по одному полю, который бы искал записи по точному совпадению ключа. Уот таким я увидел интерфейс индекса хранилища:
public interface IStorageIndex<TKey>
{
    void Initialize(IEnumerable<TKey> keys, Func<int, TKey> readKeyFunc);

    int GetIndex(TKey key);

    void InsertIndex(TKey key, int rowIndex);
}
Метод инициализации индекса принимает последовательность всех ключей (забегая вперед отмечу, что ленивую последовательность), и метод получения ключа по заданному индексу. Полагаю, что такой метод инициализации должен удовлетворить некоторое кол-во реализаций индексов (строящихся в памяти, или на диске). Метод GetIndex - собственно то, ради чего и задумывался индекс. Он должен вернуть индекс записи с указанным ключем. Метод InsertIndex добавляет в индекс информацию о положении новой записи. Простоты ради я избавился от метода удаления записи из индекса. Простейшая реализация индекса выглядит так (комментировать не буду):
public class DictionaryIndex<TKey> : IStorageIndex<TKey>
{
    private readonly Dictionary<TKey, int> _indices = new Dictionary<TKey, int>();

    public void Initialize(IEnumerable<TKey> keys, Func<int, TKey> readKeyFunc)
    {
        int index = 0;
        foreach(TKey key in keys)
        {
            _indices[key] = index++;
        }
    }

    public int GetIndex(TKey key)
    {
        int result;
        return _indices.TryGetValue(key, out result)
            ? result
            : -1;
    }

    public void InsertIndex(TKey key, int rowIndex)
    {
        _indices.Add(key, rowIndex);
    }
}
Кстати, именно подобный индекс основанный на словаре успешно трудится уже около двух месяцев в некоторых заведениях ;) страны. Теперь опишу базовый класс индексированного хранилища записей. Для индесированного по одной колонке хранилища нам потребуется еще один generic параметр TKey, который будет представлять тип колонки, по которой проводится индексирование. Конструктор индексированного хранилища принимает дополнительные параметры - колонку, по которой проводится индексирование и реализацию индекса:
public class IndexedStorage<THeader, TRow, TKey> : StreamStorage<THeader, TRow>
    where THeader : RowBase<THeader>
    where TRow : RowBase<TRow>
{
    private readonly Column<TKey> _keyColumn;
    private readonly IStorageIndex<TKey> _index;

    protected IndexedStorage(Stream stream, Encoding encoding, Column<TKey> keyColumn, IStorageIndex<TKey> index)
        : base(stream, encoding)
    {
        if(index == null)
        {
            throw new ArgumentNullException("index");
        }
        if(keyColumn == null)
        {
            throw new ArgumentNullException("keyColumn");
        }
   
        if(keyColumn.Columns != RowBase<TRow>.ColumnCollection)
        {
            throw new ArgumentException();
        }
        _keyColumn = keyColumn;
        _index = index;
    }
    ...
}
Методы, потребующиеся для инициализации индекса (чтение ключа записи по указанному номеру и чтение последовательности ключей всех записей)
private TKey ReadKey(int index)
{
    return ReadValue(_keyColumn, index);
}

private IEnumerable<TKey> LoadKeys()
{
    int length = Length;

    using (var notOwnedStream = new NotOwnedStreamProxy(Stream))
    using (var bufferedStream = new BufferedStream(notOwnedStream))
    using (var reader = new BinaryReader(bufferedStream))
    {                   
        byte[] buffer =
            new byte[RowBase<TRow>.ColumnCollection.Size - _keyColumn.Size];
         bufferedStream.Position = RowBase<THeader>.ColumnCollection.Size + _keyColumn.Offset;

        for (int i = 0; i < length; i++)
        {
            yield return _keyColumn.ReadValue(reader);
         
            if (i < length - 1)
            {
                reader.ReadBytes(buffer.Length);
            }
        }
    }
}
второй метод потребует комментариев. Дело в том, что я его изуродовал, пытаясь выжать производительность. Самая простая его реализация - вызов в цикле вышеописанного метода ReadKey(int index) безобразно долго работает. Дело в том, что при таком подходе потребуется большое (по числу записей) количество операций Seek. Хоть изменения позиции и небольшие, но сама операция на FileStream (а это основной сценарий использования этого хранилища) все равно занимает значительное время. Вместо операции Seek я решил использовать чтение в буфер, размер которого равен размеру записи без ключевого поля. Т.е. прочитав первый ключ и вчитав в этот буфер дальнейшее содержимое, позиция потока оказывается ровно перед ключем следующего поля. Еще одна оптимизация - использование буферизованного потока дает небольшое преимущество перед чтением напрямую. Небольшое, но я решил им все же воспользоваться. Потому потребовался другой экземпляр BinaryReader. Вся эта оптимизация привела к следующей проблеме: при освобождении BufferedStream, равно как и BinaryReader-а для чтения из буферизованного стрима, освобождается несущий стрим хранилища. Решений может быть несколько: постараться не освобождать эти сущности (хранить их в полях класса), либо оградить несущий стрим от деструктивных воздействий вспомогательных сущностей. Я предпочел второе, хоть и пришлось много постучать по кнопкам: пришлось реализовать хитрый proxy для класса Stream, который делегирует все методы и свойства низлежащему стриму, за исключением метода Close(). Этот класс я худобедно назвал NotOwnedStreamProxy. Полные его исходники приводить не буду, ограничусь выбранными наугад методами (остальные выглядят похожим образом):
public override void Close()
{
    // BaseStream.Close(); ВСЕ РАДИ ТОГО, ЧТОБЫ НЕ ДОПУСТИТЬ ВЫЗОВ ЭТОГО!!!
    base.Close();
}

public override int EndRead(IAsyncResult asyncResult)
{
    return BaseStream.EndRead(asyncResult);
}

public override void EndWrite(IAsyncResult asyncResult)
{
    BaseStream.EndWrite(asyncResult);
}

public override int ReadByte()
{
    return BaseStream.ReadByte();
}
И не говорите! Сам не в восторге. Метод инициализации базового индексированного хранилища теперь выглядит так:
protected override void Initialize()
{
    lock (SyncRoot)
    {
        base.Initialize();

        _index.Initialize(LoadKeys(), ReadKey);
    }
}
Еще пара нехитрых методов базового индексированного хранилища:
protected int GetIndex(TKey key)
{
    lock(SyncRoot)
    {
        return _index.GetIndex(key);
    }
}

protected TRow InsertRow(TKey key)
{
    int newRowIndex;
    lock(SyncRoot)
    {
        newRowIndex = Length;

        _index.InsertIndex(key, newRowIndex);

        Length += 1;
        WriteValue(_keyColumn, newRowIndex, key);
    }

    return GetRow(newRowIndex);
}
Осталось чуть-чуть. Уже добрались до конкретного индексированного хранилища:
class IndexedStorage : IndexedStorage<TestHeader, TestRow, Guid>
{
    public IndexedStorage(Stream stream)
        : base(
            stream,
            Encoding.Default,
            TestRow.IdColumn,
            new DictionaryIndex<Guid>())
    {
        base.Initialize();
    }

    public TestRow AddNewRow(Guid id)
    {
        return InsertRow(id);
    }

    public TestRow GetRow(Guid id)
    {
        return GetRow(GetIndex(id));
    }
}
Класс TestRow описан в конце первого поста о хранилище записей.
В следующий раз приведу результаты стресс-тестирования этого безобразия в сравнении с SQLite.

Комментариев нет:

Отправить комментарий