Хочу поделиться найденным способом работы с курсорами в БД для последовательного поиска записей. Актуально для таблиц типа bag очень боль?ого объема, да и для других случаев, когда использование индексов невозможно.
В примере ниже идет выбор записи по полю типа now() - время. ?щется запись с наиболее близким в боль?ую сторону временем к требуемому. Т.е. если предположить, что таблица отсортирована по времени, то требуется вернуть запись со временем, минимальным из всех боль?их указанного в запросе. Необходимо выполнить серию последовательных по времени запросов, например доставать записи, отличающиеся примерно на секунду по времени.
Обычными запросами к БД действовать здесь было нельзя, т.к. объем таблицы - от гига, а допустимое время на получение ответа ограничено. Раз в 500мс найти запись за очередные 500 мс.
Таблица в mnesia построена на основе записи rec. Тип таблицы - bag.
Таблица имеет неуникальный вне?ний индекс ext_id.
-record(rec, {
ext_id,
dt,
data
}).
%===========================
% API
%===========================
set_cursor(ExtId) ->
spawn_link(fun() -> long_trans(ExtId) end).
find(Pid, Dt) ->
Pid ! {find, self(), Dt},
receive
{answer, Pid, Data} -> Data
end.
del_cursor(Pid) ->
Pid ! bye.
Порядок работы:
1. ?нициализировать поиск методом set_cursor/1. В его параметре указывается значение вне?него индекса, для которого требуется перебор по времени.
2. Вызывать find/2 много раз подряд с постоянно увеличивающимся значением параметра Dt - время (типа now()).
3. Удалить курсор вызовом del_cursor/1.
В функции set_cursor создается процесс, в котором будет крутиться транзакция чтения из БД. Транзакция будет жить до тех пор, пока не выполнится вызов del_cursor. В данном примере ExtId не играет особой роли, просто показывает, что запрос в курсоре может быть и навороченным.
Функция find отправляет внутрь транзакции сообщение с параметрами очередного поиска {find, pid(), Dt}, после чего ждет ответ {answer, pid(), Data}. ?дентификаторы процессов здесь нужны были потому, что параллельно выполнялось много таких транзакций.
long_trans(Id) ->
F = fun() ->
C = qlc:cursor(qlc:q([X || X <- mnesia:table(rec),
X#rec.ext_id =:= Id])),
loop(C, []),
qlc:delete_cursor(C)
end,
mnesia:transaction(F).
Здесь создается курсор по запросу (X#rec.ext_id =:= Id) и запускается основной цикл работы с ним - loop. ОСНОВНАЯ ?ДЕЯ как раз внутри этой функции: с курсором можно работать ТОЛЬКО в пределах одной транзакции. А нам нужно снять это ограничение и выполнять последовательный перебор данных из вне?него процесса, вне транзакции.
loop(C, Buf) ->
receive
{find, Pid, Dt} ->
{Data, NewBuf} = rec_by_time(C, Buf, Dt),
Pid ! {answer, self(), Data},
loop(C, NewBuf);
bye -> ok
end.
Основной цикл процесса транзакции чтения данных. Ждет либо очередных параметров поиска - {find...}, либо команды завер?ения транзакции - bye. Очередной поиск происходит в функции rec_by_time.
rec_by_time(C, [], Dt) ->
case qlc:next_answers(C,10) of
[] -> {not_found, []};
List -> rec_by_time(C, List, Dt)
end;
rec_by_time(_C, [H|T], Dt) when H#rec.dt >= Dt ->
{H, T};
rec_by_time(C, [_|T], Dt) ->
rec_by_time(C, T, Dt).
Функция возвращает либо найденную запись, либо атом not_found, если поиск закончился не удачно. Считывается очередной блок данных (10 записей) из курсора - qlc:next_answer и в нем выполняется поиск (H#rec.dt >= Dt). Кроме результата поиска функция возвращает остав?ийся кусок буфера,
начиная с которого будет выполняться следующий поиск.
Кроме всего прочего здесь обходится невозможность выполнения операция сравнения значений типа now() в запросе qlc:q(...). Не понимаю,
для чего это сделано, но вызов типа
qlc:q([X || X <- mnesia:table(rec), X#rec.dt >= Dt])
приводит к исключению bad_params.
Примерно вот так. Если что-то не понятно - интересуйтесь, поясню.`
