Apache Ignite — вычисления в гриде

Вычисления в гриде или майнинг «красивых» хешей, такую задачу я решил проверить для вычисления в гриде Apache Ignite. Ранее я пробовал и писал Ignite как Sql БД, но для себя я понял что это пока удобная опция в этой вычислительной системе (к SQL на Ignite я еще вернусь), именно так как вычислительная система я себе ее представляю с возможностью быстрой и не дорогой масштабируемостью. Вот это и посмотрим, как можно быстро и недорого нарастить вычисления, или нет, например нарастить вычисления 1 мощного компьютера добавляя к нему несколько слабых.

Задача такая, для блоков данных-транзакций вычислить хеш, но не простой, а с некоторой сложностью, например содержащий подряд семь символов 'А'. Что бы это было возможно, к блоку данных будем приклеивать постоянно увеличивающейся в цикле число, пока не будет получен хеш заданной сложности. Да, это похоже как делают майнеры добывая крипто валюту. Поскольку у меня несколько транзакций я их буду отправлять в вычислительный грид. Вычислительный грид это ноды Ignite, запущенные экземпляры на разных компьютерах, они сами себя обнаруживают и образуют грид. Распределяться эти вычисления между нодами будут равномерно и автоматически.

И так мои вычислительные мощности, в домашних условиях это:

Intel Core I5–7400 3,5 Ггц 8 Гб. ОЗУ
Intel Core I3–6100 3,7 Ггц 8 Гб. ОЗУ
Intel Core 2Duo E6550 2,3 Ггц 8 Гб. ОЗУ
На каждом из них будет запускаться Ignite.

Вот один из шаблонов которые предлагает Ignite для вычисления в гриде
//Стартуем клиента
Ignition.setClientMode(true);
try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {

    //Подготовка к вычислениям, создание задач 
    //Проходим через все задания и создаем задачу на выполнение, добавляем в список
    for (final T item: list) {
        calls.add(new IgniteCallable() {
            @Override public String call() throws Exception {
                //Вот этот код будет выполняться на нодах, на разных компьютерах в сети 
                return result;
            }
        });
    }

    // Запускаем список заданий на ноды грида
    Collection res = ignite.compute().call(calls);

    // Получаем результат от каждой ноды 
    for (String hash : res) {
        System.out.println(">>> Check all nodes for output : " + result);
    }
}


Вот код который будет вычисляться на нодах грида (public String call ())
                calls.add(new IgniteCallable() {
                    @Override public String call() throws Exception {
                        System.out.println();
                        System.out.println(">>> Id  transaction=#" + transaction.getId() + " on this node from ignite job.");

                        MessageDigest md = MessageDigest.getInstance("MD5");
                        String transactHash;
                         // ищем красивый хеш
                        do {
                            md.update(transaction.getDifficultyContent().getBytes());
                            byte[] digest = md.digest();
                            transactHash = DatatypeConverter.printHexBinary(digest).toUpperCase();
                            // увеличиваем сложность
                            transaction.setDifficulty(transaction.getDifficulty() + 1);
                        } while (!transactHash.contains("AAAAAAA"));

                        return transactHash;
                    }
                });


Полный код
public class MyComputeCallable {
    // Данные для транзакции 
    public static final String LOREM_IPSUM = "Lorem ipsum dolor sit amet, consetetur sadipscing elitr,# " +
            "sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua.#" +
            "At vero eos et accusam et justo duo dolores et ea rebum.#" +
            "Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.";

    /**
     * Executes example.
     *
     * @param args Command line arguments, none required.
     * @throws IgniteException If example execution failed.
     */
    public static void main(String[] args) throws IgniteException {
        String[] loremIpsum = LOREM_IPSUM.split("#");
        List transactionList = new ArrayList<>();
        for (int i= 0; i <= 10; i++) {
            transactionList.add(i, new Transaction(i, loremIpsum[i % 4]));
        }

        Ignition.setClientMode(true);

        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
            System.out.println();
            System.out.println(">>> Compute callable example started.");

            Collection> calls = new ArrayList<>();

            // Iterate through all words in the sentence and create callable jobs.
            for (final Transaction transaction : transactionList) {
                calls.add(new IgniteCallable() {
                    @Override public String call() throws Exception {
                        System.out.println();
                        System.out.println(">>> Id  transaction=#" + transaction.getId() + " on this node from ignite job.");

                        MessageDigest md = MessageDigest.getInstance("MD5");
                        String transactHash;
                        do {
                            md.update(transaction.getDifficultyContent().getBytes());
                            byte[] digest = md.digest();
                            transactHash = DatatypeConverter.printHexBinary(digest).toUpperCase();
                            // увилмваем сложность
                            transaction.setDifficulty(transaction.getDifficulty() + 1);
                        } while (!transactHash.contains("AAAAAAA"));

                        return transactHash;
                    }
                });
            }

            // Execute collection of callables on the ignite.
            long millis = System.currentTimeMillis();
            Collection res = ignite.compute().call(calls);

            System.out.println();
            // individual received from remote nodes.
            for (String hash : res) {
                System.out.println(">>> Check all nodes for output hash: " + hash);
            }

            System.out.println(">>> Total msec: " + (System.currentTimeMillis() - millis));
        }
    }
}
//----------------------- Transaction   ---------------------------------
public class Transaction {
    private int difficulty;
    private int id;
    private String content;

    public Transaction(int id, String content) {
        this.id = id;
        this.content = content;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getDifficulty() {
        return difficulty;
    }

    public void setDifficulty(int difficulty) {
        this.difficulty = difficulty;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public String getDifficultyContent() {
        return "" + difficulty + content;
    }

}


Результат вычислений для 11-и транзакций на одной ноде (один компьютер), код выше

Компьютер
Результат msec.
I5
40 909
I3
57 162
2 Duo
140 673


Приятно что мощности со времен 2Duo выросли. А вот так это выглядело на картинках с I5

Консоль Ignite
image
Видно что к серверу подключился клиент (srevers=1, clients=1), на ноду прилетели транзакции для вычислений (всего 11) >>> Id transaction…
по окончании клиент отключился (srevers=1, clients=0)


Вывод программы (красивые хеши)
image
А вот вычисленные «красивые» хеши


А теперь начнем наращивать мощность вычисления т.е. запускать на нескольких компьютерах Ignite server. Поскольку пример реализован так, что ожидает выполнение всех задач, то окончанием будет вычислением на последней ноде, и если мы подключаем слабые компьютеры к сильным, результат будет падать (по крайне мере вначале).

Результат вычислений на нескольких нодах.

Ноды
Результат msec.
I5+I3
44 389
I5+I3+2Duo
68 892


Видно например что добавив к I5, I3 результат стал ниже чем для одного I5, но лучше чем один I3. Таким образом время вычисления для данной реализации будет мериться по слабой ноде и равно времени за которое нода обработает свою порцию задач. Что бы получить время вычисления в комбинации I5, I3 стала лучше чем на одном I5, нужно понять на каком количестве транзакций I3 покажет время лучшее чем I5 для всех. Экспериментально быстро установил, что I3 порцию из 4–5 транзакций обрабатывает также или лучше как I5 все 11 транзакций, таким образом такие порции возможны когда в гриде будут 3 ноды — I5 + I3 +I3, мои ожидания, что мой грид вычислит эту задачу ~30 сек.(против 40 сек. на одном I5), вот такая масштабируемость.

Ну, а добавляя к слабым компьютерам в гриде мощные, конечно сразу получаем увеличение. Один 2Duo считал 140 сек, а в гриде с другими за 68 сек.

Так выглядит одна из консолей Ignite для трех нод в гриде

image

Показывает 3 сервера, один клиент который рассылает на них задачи, CPU показывает как сумму с трех компьютеров, ну память тоже как сумму. Видно что нода получила 4-ре задачи из 11-и (транзакций), по окончании остались три сервера.

В целом распределенные задачи решаются здорово, предлагаются различные шаблоны, с разными возможностями. Далее хочу вернуться к SQL в Ignite и поработать с кешам, напишу…

Топология
image
image


Материалы:

Ignite Getting Started

© Habrahabr.ru