Динамическая балансировка нагрузки в pull-схеме

В прошлой новости про принципы работы коллекторов логов PostgreSQL я упомянул, что одним из недостатков pull-модели является необходимость динамической балансировки нагрузки. Но если делать ее аккуратно, то недостаток превращается в достоинство, а система в целом становится гораздо более устойчивой к изменениям потока данных.

vbhdsswisljqtm6dz_ssvg82ubs.png


Давайте посмотрим, какие решения есть у этой задачи.

Распределение объектов «по мощности»


Чтобы не углубляться в неинтересные абстракции, будем рассматривать на примере конкретной задачи — мониторинга. Соотнести предлагаемые методики на свои конкретные задачи, уверен, вы сможете самостоятельно.

«Равномощные» объекты мониторинга


В качестве примера можно привести наши коллекторы метрик для Zabbix, которые исторически имеют с коллекторами логов PostgreSQL общую архитектуру.

И правда, каждый объект мониторинга (хост) генерирует для zabbix практически стабильно один и тот набор метрик с одной и той же частотой все время:

egwfsnki9xsemqml77l69vtuj6e.png


Как видно на графике, разница между min-max значениями количества генерируемых метрик не превышает 15%. Поэтому мы можем считать все объекты равными в одинаковых «попугаях».

Сильный «дисбаланс» между объектами


В отличие от предыдущей модели, для коллекторов логов наблюдаемые хосты совсем не являются однородными.

Например, один хост может генерировать в лог миллион планов за сутки, другой десятки тысяч, а какой-то — и вовсе единицы. Да и сами эти планы по объему и сложности и по распределению во времени суток сильно отличаются. Так и получается, что нагрузку сильно «качает», в разы:

ygspvos5amvcntisi4scjlxpj4s.png


Ну, а раз нагрузка может меняться настолько сильно, то надо учиться ей управлять

Координатор


Сразу понимаем, что нам явно понадобится масштабирование системы коллекторов, поскольку один отдельный узел со всей нагрузкой когда-то точно перестанет справляться. А для этого нам потребуется координатор — тот, кто будет управлять всем зоопарком.

Получается примерно такая схема:

dzp68sypbrwadggud5sgbr-apt8.png


Каждый worker свою нагрузку «в попугаях» и в процентах CPU периодически сбрасывает master’у, те — коллектору. А он, на основании этих данных, может выдать команду типа «новый хост посадить на ненагруженный worker#4» или «hostA надо пересадить на worker#3».

Тут еще надо помнить, что, в отличие от объектов мониторинга, сами коллекторы имеют вовсе не равную «мощность» — например, на одном у вас может оказаться 8 ядер CPU, а на другом — только 4, да еще и меньшей частоты. И если нагрузить их задачами «поровну», то второй начнет «затыкаться», а первый — простаивать. Отсюда и вытекают…

Задачи координатора


По сути, задача всего одна — обеспечивать максимально равномерное распределение всей нагрузки (в %cpu) по всем доступным worker’ам. Если мы сможем решить ее идеально, то и равномерность распределения %cpu-нагрузки по коллекторам получим «автоматом».

Понятно, что, даже если каждый объект генерирует одинаковую нагрузку, со временем какие-то из них могут «отмирать», а какие-то возникать новые. Поэтому управлять всей ситуацией надо уметь динамически и поддерживать баланс постоянно.

Динамическая балансировка


Простую задачу (zabbix) мы можем решить достаточно банально:

  • вычисляем относительную мощность каждого коллектора «в задачах»
  • делим все задачи между ними пропорционально
  • между worker’ами распределяем равномерно
cg2t9g0op1111esdysa0mfeajba.png


Но что делать в случае «сильно неравных» объектов, как для коллектора логов?…

Оценка равномерности


Выше мы все время употребляли термин »максимально равномерное распределение», а как вообще можно формально сравнить два распределения, какое из них «равномернее»?

Для оценки равномерности в математике давно существует такая вещь как среднеквадратичное отклонение. Кому лениво вчитываться:

S[X] = sqrt( sum[ ( x - avg[X] ) ^ 2 of X ] / count[X] )


Поскольку количество worker’ов на каждом из коллекторов у нас тоже может отличаться, то нормировать разброс по нагрузке надо не только между ними, но и между коллекторами в целом.

То есть распределение нагрузки по worker’ам двух коллекторов [ (10%, 10%, 10%, 10%, 10%, 10%) ; (20%) ] — это тоже не очень хорошо, поскольку на первом получается 10%, а на втором — 20%, что как бы вдвое больше в относительных величинах.

Поэтому введем единую метрику-расстояние для общей оценки «равномерности»:

d([%wrk], [%col]) = sqrt( S[%wrk] ^ 2 + S[%col] ^ 2 )

То есть величины среднеквадратичного отклонения для наборов величин нагрузки по всем worker’ам и по всем коллекторам воспринимаем как координаты вектора, длину которого будем стараться минимизировать.

Моделирование


Если бы объектов у нас было немного, то мы могли бы полным перебором «разложить» их между worker’ами так, чтобы метрика оказалась минимальной. Но объектов у нас — тысячи, поэтому такой способ не подойдет. Зато мы знаем, что коллектор умеет «перемещать» объект с одного worker’а на другой — давайте этот вариант и смоделируем, используя метод градиентного спуска.

Понятно, что «идеальный» минимум метрики мы так можем и не найти, но локальный — точно. Да и сама нагрузка может изменяться во времени настолько сильно, что искать за бесконечное время «идеал» абсолютно незачем.

То есть нам осталось всего лишь определить, какой объект и на какой worker эффективнее всего «переместить». И сделаем это банальным переборным моделированием:

  • для каждой пары (целевой host, worker) моделируем перенос нагрузки
  • нагрузку от host внутри исходного worker’а считаем пропорционально «попугаям»
    В нашем случае за «попугая» оказалось вполне разумно взять объем получаемого потока логов в байтах.
  • относительную мощность между коллекторами считаем пропорциональной «суммарным попугаям»
  • вычисляем метрику d для «перенесенного» состояния


Выстраиваем все пары по возрастанию метрики. В идеале, нам всегда стоит реализовать перенос именно первой пары, как дающий минимальную целевую метрику. К сожалению, в реальности сам процесс переноса «стоит ресурсов», поэтому не стоит запускать его для одного и того же объекта чаще определенного интервала «охлаждения».

В этом случае мы можем взять вторую, третью, … по рангу пару — лишь бы целевая метрика уменьшалась относительно текущего значения.

Если же уменьшать некуда — вот он локальный минимум!

Пример на картинке:

5ogj_trgnsdo94y5ltn8knk29z0.png


Запускать итерации «до упора» при этом вовсе не обязательно. Например, можно делать усредненный анализ нагрузки на интервале 1 мин, и по его завершению делать единственный перенос.

Микро-оптимизации


Понятно, что алгоритм со сложностью T(целей) x W(процессов) — это не очень хорошо. Но в нем стоит не забыть применить некоторые более-менее очевидные оптимизации, которые его могут ускорить в разы.

Нулевые «попугаи»


Если на замеренном интервале объект/задача/хост сгенерировал нагрузку »0 штук», то его не то что перемещать куда-то — его даже рассматривать и анализировать не надо.

Самоперенос


При генерации пар нет необходимости оценивать эффективность переноса объекта на тот же самый worker, где он и так находится. Все-таки уже будет T x (W - 1) — мелочь, а приятно!

Неразличимая нагрузка


Поскольку мы моделируем все-таки перенос именно нагрузки, а объект — всего лишь инструмент, то пробовать переносить «одинаковый» %cpu нет смысла — значения метрик останутся точно те же, хоть и для другого распределения объектов.

То есть достаточно оценить единственную модель для кортежа (wrkSrc, wrkDst, %cpu). Ну, а «одинаковыми» вы можете считать, например, значения %cpu, совпадающие до 1 знака после запятой.

Пример реализации на JavaScript
var col = {
  'c1' : {
    'wrk' : {
      'w1' : {
        'hst' : {
          'h1' : 5
        , 'h2' : 1
        , 'h3' : 1
        }
      , 'cpu' : 80.0
      }
    , 'w2' : {
        'hst' : {
          'h4' : 1
        , 'h5' : 1
        , 'h6' : 1
        }
      , 'cpu' : 20.0
      }
    }
  }
, 'c2' : {
    'wrk' : {
      'w1' : {
        'hst' : {
          'h7' : 1
        , 'h8' : 2
        }
      , 'cpu' : 100.0
      }
    , 'w2' : {
        'hst' : {
          'h9' : 1
        , 'hA' : 1
        , 'hB' : 1
        }
      , 'cpu' : 50.0
      }
    }
  }
};

// вычисляем опорные метрики и нормализуем по "мощности"
let $iv = (obj, fn) => Object.values(obj).forEach(fn);
let $mv = (obj, fn) => Object.values(obj).map(fn);

// initial reparse
for (const [cid, c] of Object.entries(col)) {
  $iv(c.wrk, w => {
    w.hst = Object.keys(w.hst).reduce((rv, hid) => {
      if (typeof w.hst[hid] == 'object') {
        rv[hid] = w.hst[hid];
        return rv;
      }
      // нулевые значения ничего не решают, поэтому сразу отбрасываем
      if (w.hst[hid]) {
        rv[hid] = {'qty' : w.hst[hid]};
      }
      return rv;
    }, {});
  });
  c.wrk = Object.keys(c.wrk).reduce((rv, wid) => {
    // ID воркеров должны быть глобально-уникальны
    rv[cid + ':' + wid] = c.wrk[wid];
    return rv;
  }, {});
}

// среднеквадратичное отклонение
let S = col => {
  let wsum = 0
    , wavg = 0
    , wqty = 0
    , csum = 0
    , cavg = 0
    , cqty = 0;

  $iv(col, c => {
    $iv(c.wrk, w => {
      wsum += w.cpu;
      wqty++;
    });
    csum += c.cpu;
    cqty++;
  });
  wavg = wsum/wqty;
  wsum = 0;
  cavg = csum/cqty;
  csum = 0;
  $iv(col, c => {
    $iv(c.wrk, w => {
      wsum += (w.cpu - wavg) ** 2;
    });
    csum += (c.cpu - cavg) ** 2;
  });
  return [Math.sqrt(wsum/wqty), Math.sqrt(csum/cqty)];
};

// метрика-расстояние
let distS = S => Math.sqrt(S[0] ** 2 + S[1] ** 2);

// выбираем оптимальный перенос и моделируем его
let iterReOrder = col => {
  let qty = 0
    , max = 0;

  $iv(col, c => {
    c.qty = 0;
    c.cpu = 0;
    $iv(c.wrk, w => {
      w.qty = 0;
      $iv(w.hst, h => {
        w.qty += h.qty;
      });
      w.max = w.qty * (100/w.cpu);
      c.qty += w.qty;
      c.cpu += w.cpu;
    });
    c.cpu = c.cpu/Object.keys(c.wrk).length;
    c.max = c.qty * (100/c.cpu);
    qty += c.qty;
    max += c.max;
  });

  $iv(col, c => {
    c.nrm = c.max/max;
    $iv(c.wrk, w => {
      $iv(w.hst, h => {
        h.cpu = h.qty/w.qty * w.cpu;
        h.nrm = h.cpu * c.nrm;
      });
    });
  });

  // "текущее" среднеквадратичное отклонение
  console.log(S(col), distS(S(col)));

  // формируем набор хостов и воркеров
  let wrk = {};
  let hst = {};
  for (const [cid, c] of Object.entries(col)) {
    for (const [wid, w] of Object.entries(c.wrk)) {
      wrk[wid] = {
        wid
      , cid
      , 'wrk' : w
      , 'col' : c
      };
      for (const [hid, h] of Object.entries(w.hst)) {
        hst[hid] = {
          hid
        , wid
        , cid
        , 'hst' : h
        , 'wrk' : w
        , 'col' : c
        };
      }
    }
  }

  // реализация переноса нагрузки на целевой worker
  let move = (col, hid, wid) => {
    let w = wrk[wid]
      , h = hst[hid];

    let wsrc = col[h.cid].wrk[h.wid]
      , wdst = col[w.cid].wrk[w.wid];

    wsrc.cpu -= h.hst.cpu;
    wsrc.qty -= h.hst.qty;
    wdst.qty += h.hst.qty;
    // перенос на другой коллектор с "процентованием" нагрузки на CPU
    if (h.cid != w.cid) {
      let csrc = col[h.cid]
        , cdst = col[w.cid];
      csrc.qty -= h.hst.qty;
      csrc.cpu -= h.hst.cpu/Object.keys(csrc.wrk).length;
      wsrc.hst[hid].cpu = h.hst.cpu * csrc.nrm/cdst.nrm;
      cdst.qty += h.hst.qty;
      cdst.cpu += h.hst.cpu/Object.keys(cdst.wrk).length;
    }
    wdst.cpu += wsrc.hst[hid].cpu;
    wdst.hst[hid] = wsrc.hst[hid];
    delete wsrc.hst[hid];
  };

  // моделирование и оценка переноса для пары (host, worker)
  let moveCheck = (orig, hid, wid) => {
    let w = wrk[wid]
      , h = hst[hid];

    // тот же воркер - ничего не делаем
    if (h.wid == w.wid) {
      return;
    }

    let col = JSON.parse(JSON.stringify(orig));
    move(col, hid, wid);

    return S(col);
  };

  // хэш уже проверенных переносов (hsrc,hdst,%cpu)
  let checked = {};

  // перебираем все возможные пары (какой хост -> на какой воркер)
  let moveRanker = col => {
    let currS = S(col);
    let order = [];

    for (hid in hst) {
      for (wid in wrk) {
        // нет смысла пробовать повторно перемещать одну и ту же (с точностью до 0.1%) "мощность" между одной парой воркеров
        let widsrc = hst[hid].wid;
        let idx = widsrc + '|' + wid + '|' + hst[hid].hst.cpu.toFixed(1);
        if (idx in checked) {
          continue;
        }
        
        let _S = moveCheck(col, hid, wid);
        if (_S === undefined) {
          _S = currS;
        }
        checked[idx] = {
          hid
        , wid
        , S : _S
        };
        order.push(checked[idx]);
      }
    }
    order.sort((x, y) => distS(x.S) - distS(y.S));
    return order;
  };

  let currS = S(col);
  let order = moveRanker(col);

  let opt = order[0];
  console.log('best move', opt);

  // реализуем перенос
  if (distS(opt.S) < distS(currS)) {
    console.log('move!', opt.hid, opt.wid);
    move(col, opt.hid, opt.wid);
    console.log('after move', JSON.parse(JSON.stringify(col)));
    return true;
  }
  else {
    console.log('none!');
  }
  return false;
};

// пока есть что-куда переносить
while(iterReOrder(col));


В результате, нагрузка по нашим коллекторам распределяется практически одинаково в каждый момент времени, оперативно нивелируя возникающие пики:

l5z7ww2sfoqucmo2-zszr9hxtvu.png

© Habrahabr.ru