Phoebus — распределенная обработка огромных графов
07/10/2010 09:32
Phoebus — это система распределенной обработки графов, состоящих из миллиардов вершин и ребер. По сути на является реализацией системы Pregel, описаной разработчиками из Google.
Phoebus так же поддерживает вычисления по принципу MapReduce, но применительно к графам.
Вычислительная модель
- Граф разбит на группы записей (Records)
- Запись содержит вершину (Vertex) и все исходящие ребра (Edges). При этом ребро — это кортеж, состоящий из веса ребра и названия конечной вершины
- Пользователь (User) определяет «вычислительную» функцию ('Compute' function), которая будет применена к каждой записи
- Вычисления на графе происходят в виде последовательности «супер шагов» (Super Steps)
- На каждом «супершаге» функция применяется ко всем «активным» вершинам графа.
- Вершины общаются между собой при помощи сообщений
- В вычислительную функцию передается запись, содержащая текущую вершину, и все сообщения, переданные этой вершине в предыдущем «супер шаге».
- Вычислительная функция может:
- Изменять значение, связанное с вершиной
- Добавлять/удалять исходящие ребра
- Изменять вес ребра
- Посылать сообщения любым другим вершинам в графе
- Менять состояние текущей вершины с «активная» (active) на «приостановлена» (hold)
- В начале каждого «супершага», если больше нет активных вершин, а также не осталось сообщений, направленных каким-либо вершинам, алгоритм прекращает работу.
- Пользователь может указать MaxSteps — максимальное количество «супершагов», после которых алгоритм прекратит работу.
- Пользователь также может указать комбинирующую функцию ('Combine' function), которая будет применена ко всем сообщения, посланным текущей вершине, до того, как будет вызвана вычисляющая функция.
Распределенные вычисления
- Вычислительная модель позволяет выполнять алгоритм параллельно в кластере phoebus-узлов.
- «Задание» ('Job'), переданное в кластер, управляется главным процессом (Master process), который запускается на узле, получившем задание.
- Главный процесс расчленяет граф на сегменты и для каждого сегмента запускает рабочий процесс ('Worker') на одном из узлов в кластере.
- Главный процесс запускает в рабочем процессе «супершаг», обрабатывающий данные в сегменте, соответствующем данному рабочему процессу, и ожидает сообщения о завршении шага.
- Номер шага увеличивается на единицу до тех пор, пока пока все рабочие процесы не сообщат, что у них больше нет активных вершин и больше нет никаких сообщений
Как с этим работать (проверено на Mac OS X Snow Leopard)
Требования:
- rebar (http://hg.basho.com/rebar/downloads/rebar)
- git
- erlang (проверено на R13B04)
- Клонируем код с GitHub'а
$ git clone git://github.com/xslogic/phoebus.git $ cd phoebus
- Компилируем и создаем release-версию
$ rebar compile ; rm -rf rel/phoebus ; rebar generate ; chmod +x ./rel/phoebus/bin/phoebus ; ./rel/phoebus/bin/phoebus ==> rel (compile) ==> phoebus (compile) .... ==> rel (generate) Usage: phoebus {start|stop|restart|reboot|ping|console|attach} - Создаем директорию, где будут храниться файлы
$ mkdir /tmp/output
- Запускаем кластер из двух узлов
Terminal 1: $ env A_FILE=$PWD/vm1.args ./rel/phoebus/bin/phoebus console ..... Erlang R13B04 (erts-5.7.5) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:5] [hipe] [kernel-poll:true] Eshell V5.7.5 (abort with ^G) (phoebusr1@my-machine)1> Terminal 2: $ env A_FILE=$PWD/vm2.args ./rel/phoebus/bin/phoebus console ..... Erlang R13B04 (erts-5.7.5) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:5] [hipe] [kernel-poll:true] Eshell V5.7.5 (abort with ^G) (phoebusr2@my-machine)1>
- Создаем первоначальные данные: На данный момент Phoebus требует, чтобы каждая запись (Record) была на отдельной строке. Записи должны быть в виде
<VertexName>\t<VertexValue>\t<EdgeWeight1>\t<TargetVertexName1>\t<EdgeWeight2>\t<TargetVertexName2>...\n
Модуль algos из Phoebus содержит функцию, которая может сгенерировать двоичное дерево в качестве примера таких данных.
(phoebusr1@my-machine)1> algos:create_binary_tree("/tmp/input", 4, 1000). ok The create_binary_tree function has created an input data set in the directory "/tmp/input". It has created a 1000 node binary tree with root as "1" It has split the input into 4 files. $ head -5 infile1 1 1 1 2 1 3 2 2 1 4 1 5 3 3 1 6 1 7 4 4 1 8 1 9 5 5 1 10 1 11 - Запускаем простой алгоритм:
Модуль "algos" содержит простую вычислительную функцию, которая определяет наикратчайший путь к вершине.
(phoebusr1@my-machine)1> AFun = fun algos:shortest_path/2. #Fun (phoebusr1@my-machine)1> phoebus_master:start_link([{name, "first_ever"}, {max_steps, 100}, {algo_fun, AFun}, {input_dir, "file:///tmp/input/"}, {output_dir, "file:///tmp/output/"}]). okтак как на вход было передано 4 файла, Phoebus создала 4 рабочих процесса, по 2 на каждом узле. - Ждем завершения алгоритма. Каогда он будет завершен, результат будет записан в /tmp/output. выведем все вершины, которые начинаются на "20"...
$ cat /tmp/output/* | grep '^20' 200 100:50:25:12:6:3:1 1 400 1 401 204 102:51:25:12:6:3:1 1 408 1 409 20 10:5:2:1 1 40 1 41 201 100:50:25:12:6:3:1 1 402 1 403 203 101:50:25:12:6:3:1 1 406 1 407
Вторая колонка показывает наикратчайший путь к вершине, начиная с корня двоичного дерева
В будущем
- На данный момент Phoebus подерживает только локальную файловую систему. Необходима поддержка распределенных ФС (HDFS, DDFS)
- Улучшение устойчивости к падениям. Если рабочий процесс умирает, главный процесс должен передать его работу другим процессам
- Разработтчики Pregel говорят об 'Aggregate' Function. Необходимо ее реализовать
- Поддержка задач, написанных на Питоне
Проект доступен на GitHub'е: http://github.com/xslogic/phoebus
