Добавление временной составляющей к данным с помощью Temporal SQLAlchemy

Иногда возникает необходимость организовать хранение данных таким образом, чтобы всегда иметь четкое представление о том, как они менялись с течением времени. Статья рассматривает пример решения подобной задачи с помощью Temporal SQLAlchemy.

Существует несколько способов добавить временную составляющую к данным. Например, SQLAlchemy Continuum использует таблицу истории для каждой версионируемой сущности. В модель можно добавить такие поля, как date_created и date_modified . Но сегодня мы поговорим о Temporal SQLAlchemy — решении, которое использует таблицу для каждого свойства и счетчик времени для сущностей: все состояния фиксируются в таблице сущности и в дополнение к этому делаются записи в последовательности таблиц истории (для каждого свойства) и счетчике времени (для каждого объекта).

1. Постановка задачи

Мы хотим иметь возможность получить историю изменения места жительства друзей, используя подобный код:

new_friend = Friend("Rachel", location="New York")
session.add(new_friend)
session.commit()

# несколько проходов

friend = session.query(Friend).filter_by(name="Rachel").one()
with friend.clock_tick():
    friend.location = "San Francisco"

session.commit()
friend.location_history[0] == "New York"
friend.location_history[1] == friend.location == "San Francisco"

2. Определение таблиц

2.1. Таблицы с историей версий

Friends
id name location
1 Dave Berkeley
2 Rachel San Francisco
3 Will Oakland
Friends History
id name location timestamp
1 Dave Tucson 20100913
1 Dave San Francisco 20130502
2 Rachel New York 20141008
3 Will San Francisco 20150922
2 Rachel San Francisco 20160108
1 Dave Berkeley 20160819
3 Will Oakland 20160929

2.2. Отслеживание свойств

Friends
id name location vclock
1 Dave Berkeley 3
2 Rachel San Francisco 2
3 Will Oakland 2
Clock
friend tick timestamp
1 1 20100913
1 2 20130502
2 1 20141008
3 1 20150922
2 2 20160108
1 3 20160819
3 3 20160929
Location
friend location vclock
1 Tucson 1, 2
1 San Francisco 2, 3
2 New York 1, 2
3 San Francisco 1, 2
2 San Francisco 2, ∞
1 Berkeley 3, ∞
3 Oakland 3, ∞

3. Создание основных таблиц

3.1. Создание friend и friend_clock

CREATE TABLE friend (
 id serial PRIMARY KEY,
 name text,
 location text,
 vclock int
);
CREATE TABLE friend_clock (
 friend_id integer REFERENCES friend (id),
 tick int,
 timestamp timestamp WITH time zone,
 PRIMARY KEY (friend_id, tick)
);

3.2. Добавление ограничений-исключений

Обратите внимание: используется модуль btree_gist. Он предоставляет показательные классы операторов GiST, реализующие поведение, подобное тому, что реализуют обычные классы B-дерева.

CREATE EXTENSION btree_gist;
CREATE TABLE friend_location_history (
 id serial PRIMARY KEY,
 friend_id integer REFERENCES friend (id),
 vclock int4range,
 location text,
 EXCLUDE USING gist (friend_id WITH =, vclock WITH &&)
);

Пример работы ограничений-исключений:

-- начинаем запись истории для Dave
INSERT INTO friend_location_history (friend_id, location, vclock)
VALUES
(1, 'Tucson', int4range(1, null));
INSERT 0 1
-- Невозможно жить одновременно в двух местах!
INSERT INTO friend_location_history (friend_id, location, vclock)
VALUES
(1, 'San Francisco', int4range(2, null));
ERROR: conflicting key value violates exclusion constraint
"friend_location_history_friend_id_vclock_excl"
DETAIL: Key (friend_id, vclock)=(1, [2,)) conflicts with existing key (friend_id,
vclock)=(1, [1,)).

4. Создание модели на основе TemporalModel

Опустим чистый SQL, перейдем сразу к Alchemy.

import sqlalchemy as sa
import sqlalchemy.ext.declarative as sa_decl
import temporal_sqlalchemy as temporal

Base = sa_decl.declarative_base()


class Friend(Base, temporal_sqlalchemy.TemporalModel):
    __tablename__ = 'friend'
    id = sa.Column(sa.Integer, primary_key=True)
    name = sa.Column(sa.Text)
    location = sa.Column(sa.Text)

    class Temporal:
        track = ('name', 'location',)
        schema = 'history_schema'

Теперь код, который был продемонстрирован в начале статьи, должен работать.

5. Интроспекция TemporalModel

Напоследок разберемся, что же «под капотом» TemporalModel.

5.1. Модель

class TemporalModel:
    vclock = sa.Column(sa.Integer, default=1)

    @contextlib.contextmanager
    def clock_tick(self, activity: TemporalActivityMixin = None):
        ...

    @staticmethod
    def temporal_map(mapper: orm.Mapper, cls):
        ...

    @staticmethod
    def init_clock(clocked: 'TemporalModel', args, kwargs):
        ...

    @declarative.declared_attr
    def __mapper_cls__(cls):
        ...

5.2. Объявленные атрибуты

@declarative.declared_attr
def __mapper_cls__(cls):
    assert hasattr(cls, 'Temporal')

    def mapper(cls, *args, **kwargs):
        mp = orm.mapper(cls, *args, **kwargs)
        cls.temporal_map(mp, cls)
        return mp
    return mapper

5.3. Отображение для временной составляющей

@staticmethod
def temporal_map(mapper: orm.Mapper, cls):
    # 1. Происходит получение всего того, что определено в Temporal.
    # 2. Проверяется, что все временные свойства имеют active_history (всегда загружены).
    # 3. Используется для построения новой модели счетчика времени для этого объекта.
    # 4. Для каждого свойства создаются модели для хранения истории.
    event.listen(cls, 'init', TemporalModel.init_clock)

5.4. Инициализация истории

@staticmethod
def init_clock(clocked, args, kwargs):
    kwargs.setdefault('vclock', 1)
    initial_tick = clocked.temporal_options.clock_model(
        tick=kwargs['vclock'],
        entity=clocked,
    )
    if 'activity' in kwargs:
        initial_tick.activity = kwargs.pop('activity')

5.5. Обновление истории

@contextlib.contextmanager
def clock_tick(self, activity: TemporalActivityMixin = None):
    """Увеличивает vclock на 1 с изменениями, привязанными к сесиии"""
    if self.temporal_options.activity_cls is not None and activity is None:
        raise ValueError("activity is missing on edit") from None

    session = orm.object_session(self)
    with session.no_autoflush:
        yield self

    if session.is_modified(self):
        self.vclock += 1

    new_clock_tick = self.temporal_options.clock_model(entity=self, tick=self.vclock)
    if activity is not None:
        new_clock_tick.activity = activity

    session.add(new_clock_tick)

5.6. Декоратор

def add_clock(*props: typing.Iterable[str],
    activity_cls: nine.Type[TemporalActivityMixin] = None,
    temporal_schema: typing.Optional[str] = None):
    # все те же вещи, что и выше

6. Пример работы с диапазонами

-- Вхождение
SELECT int4range(10, 20) @> 3;
-- Совмещение
SELECT numrange(11.1, 22.2) && numrange(20.0, 30.0);
-- Получение верхней границы
SELECT upper(int8range(15, 25));
-- Вычисление перересечения
SELECT int4range(10, 20) * int4range(15, 25);
-- Проверка диапазона на пустоту
SELECT isempty(numrange(1, 5));

Выводы

  • Если нужно хранить историю изменений, придется идти на компромиссы.
  • Множество моделей и свойств приводит к большому количеству таблиц.
  • Выполнять операции в пакетном режиме очень сложно.

См. также документацию на Temporal Sqlalchemy.

Примеры взяты из презентации Joseph Leingang (Engineering Manager at Clover Health), представленной на PyCon CA 2017.