Ortools — библиотека для решения задачи VRP

Привет! Меня зовут Илья Набатчиков, я MLE в компании Kamaz Digital. Также я являюсь учусь в онлайн магистратуре на базе университета ИТМО @ai-talent.

Сегодня я хочу рассказать о библиотеке ortools для решения проблемы маршрутизации транспортных средств с учетом ограничений по времени и грузоподъемности (CVRPTW).

И самое важно поделиться парой важных фичей, которых вы не найдете в документации.

Что такое VRP и CVRPTW?

VRP (Vehicle Routing Problem) — это одна из классических задач оптимизации в области логистики, связанная с поиском оптимальных маршрутов для транспортных средств с целью доставки товаров клиентам или выполнения других задач с минимизацией затрат. Когда у транспортных средств есть ограниченная грузоподъемность, а у клиентов есть временные интервалы, в которые необходимо осуществить доставку, проблема становится проблемой маршрутизации транспортных средств с учетом ограничений по времени и грузоподъемности (CVRPTW).

В данной статье мы обсудим, как решить CVRPTW, чтобы получить быстрое и надежное решение с использованием библиотеки ortools.

Постановка задачи

В классической постановке задачи VRP транспортные средства (далее ТС) стартуют из одной точки — депо и соответственно должны вернуться в него после посещения всех заказов, которые известны заранее.

Мы рассмотрим более сложную задачу. Пусть у нас есть сервис пассажирских перевозок. Имеем следующие условия:

  1. Заранее все точки заказов не известны, оптимизация запускается заново для каждого нового пассажира;

  2. Для упрощения расчетов возьмем 2 ТС вместимостью 8 человек;

  3. У пассажиров есть строгие интервалы посадки и высадки (временные окна). Для нового пассажира, также действуют ограничения на максимальное время ожидание = 30 минут и на время ожидания + время в пути = 45 минут;

  4. Точка старта маршрута — текущее положение ТС (в отличие от депо в классической постановке);

  5. Для упрощения возьмем одинаковые точки окончания маршрута ТС.

Математическая постановка задачи

Сначала мы сформулируем задачу как смешанную целочисленную программу. Затем решаем задачу для ряда доступных грузовиков, использующих данную формулу. Поскольку CVRPTW является NP-сложным, мы ожидаем, что время работы модели будет увеличиваться по мере увеличения количества доступных грузовиков. уменьшается.

\textbf{Множества:} \\ \begin{align*} & N \text{ = множество всех узлов (начальные и конечные точки)} \\ & P \text{ = множество точек посадки и высадки} \\ & D \text{ = множество точек разрыва} \\ & K \text{ = множество ТС } \end{align*}\textbf{Параметры:} \\  \begin{align*} & c_{ij} \text{ = время в пути между узлами } i \text{ и } j, c_{ij} \geq 0 \\  & q_i \text{ = спрос на узле } i,  -Q_k \leq q_i  \leq Q_k \\  & Q_k \text{ = вместимость ТС } k, Q_k > 0 \\ & TW_i \text{ = интервал времени для узла } i, TW_i \geq 0 \\ & O_i \text{ = узел } i \text{ обязателен для ТС } k, O_i \in K \\ & T_i \text{ = тип узла } i \text{ (start, onboard, pickup, delivery, finish)} \\ & R_k \text{ = начальный маршрут для ТС } k \\ & p \text{ = штраф за невключение необязательного узла в маршру } \end{align*}» src=«https://habrastorage.org/getpro/habr/upload_files/e34/54d/d46/e3454dd46c2d8813666e58f018714eea.svg» /><img alt=\textbf{Целевая функция:} \\ \begin{equation*}  \text{Minimize} \quad \sum_{k=1}^{K} \sum_{i=1}^N \sum_{j=1}^N c_{ij} \cdot x_{ijk} + \sum_{i}^{D} (1 - y_{ik}) \cdot p  \end{equation*}\textbf{Ограничения:} \\ \begin{align*} & \text{1. Каждый узел посещается ровно один раз:} \quad \sum_{i=1}^N \sum_{k=1}^{K} y_{ik} = 1, \quad \forall i \in N \\ & \text{2. Ограничение вместимости ТС:} \quad \sum_{i=1}^N  q_i \cdot y_{ik} \leq Q_k, \quad \forall k \\ & \text{3. Ограничение временного окна:} \quad TW_i \leq u_{ik} \leq TW_i, \quad \forall i \in N, \forall k \\ & \text{4. Последовательность посадки и высадки:} \quad \sum_{i}^P y_{ik} = 2 \cdot \sum_{(i,j)\in P}  x_{ijk}, , \quad \forall k \\ & \text{5. Ограничение для необязательных узлов:} \quad \sum_{i}^{D} y_{ik} \leq 1, \quad \forall i\in D, \forall k \end{align*}

Создание даты

Следующая функция создает данные для задачи.

def create_data_model():
    data = {}
    data["time_matrix"] = [
        [0, 6, 0, 6, 5, 6, 7, 6, 8, 2, 9, 2, 4, 9, 9, 4, 9, 9],
        [6, 0, 6, 0, 2, 2, 2, 1, 3, 3, 4, 3, 7, 11, 4, 4, 12, 12],
        [0, 6, 0, 6, 5, 6, 7, 6, 8, 2, 9, 2, 4, 9, 9, 4, 9, 9],
        [6, 0, 6, 0, 2, 2, 2, 1, 3, 3, 4, 3, 7, 11, 4, 4, 12, 12],
        [6, 0, 6, 0, 0, 2, 3, 2, 4, 4, 5, 3, 7, 10, 5, 3, 12, 12],
        [9, 4, 9, 4, 3, 0, 1, 2, 1, 6, 3, 6, 9, 12, 3, 6, 14, 14],
        [8, 4, 8, 4, 3, 2, 0, 1, 2, 5, 2, 5, 9, 12, 2, 5, 14, 14],
        [7, 2, 7, 2, 2, 0, 1, 0, 1, 4, 3, 4, 8, 11, 3, 4, 13, 13],
        [9, 4, 9, 4, 4, 2, 0, 2, 0, 6, 2, 6, 10, 13, 2, 6, 14, 14],
        [2, 5, 2, 5, 5, 4, 5, 4, 5, 0, 7, 3, 3, 10, 7, 6, 11, 11],
        [8, 4, 8, 4, 3, 2, 0, 1, 2, 5, 0, 5, 9, 12, 0, 5, 14, 14],
        [3, 4, 3, 4, 4, 4, 5, 4, 6, 2, 7, 0, 5, 9, 7, 3, 10, 10],
        [1, 7, 1, 7, 6, 6, 7, 6, 7, 2, 9, 4, 0, 9, 9, 5, 9, 9],
        [10, 11, 10, 11, 11, 13, 14, 13, 14, 10, 16, 10, 11, 0, 16, 9, 6, 6],
        [8, 4, 8, 4, 3, 2, 0, 1, 2, 5, 0, 5, 9, 12, 0, 5, 14, 14],
        [7, 5, 7, 5, 4, 6, 6, 5, 7, 5, 8, 4, 9, 9, 8, 0, 10, 10],
        [10, 12, 10, 12, 12, 14, 15, 14, 14, 10, 17, 10, 12, 4, 17, 10, 0, 0],
        [10, 12, 10, 12, 12, 14, 15, 14, 14, 10, 17, 10, 12, 4, 17, 10, 0, 0],
    ]
    data["num_vehicles"] = 2
    data["starts"] = [0, 1]
    data["ends"] = [16, 17]
    data["pickups_deliveries"] = [(10, 11), (12, 13), (14, 15)]
    data["obligations"] = [
        0, 1, 0, 1, 0, 0, 0, 1, 1, 1, 0, 0, 1, 1, None, None, 0, 1
    ]
    data["demands"] = [
        0, 0, 3, 3, -1, -1, -1, -1, -1, -1, 1, -1, 1, -1, 1, -1, 0, 0
    ]
    data["vehicle_capacities"] = [8, 8]
    data["point_types"] = [
        "start", "start", "onboard", "onboard", "delivery", "delivery",
        "delivery", "delivery", "delivery", "delivery", "pickup", "delivery",
        "pickup", "delivery", "pickup", "delivery", "finish", "finish",
    ]
    data["time_windows"] = [
        (0, 0), (0, 0), (0, 0), (0, 0), (0, 9), (0, 11), (0, 11), (0, 4), 
        (0, 4), (0, 10), (8, 15), (0, 23), (9, 17), (0, 32), (4, 30), (9, 45),
        (0, 79), (0, 119),
    ]
    data["disjunction"] = [(14, 15)]
    data["initial_routes"] = [[2, 4, 6, 5, 10, 11], [3, 8, 7, 9, 12, 13]]
    return data

Где:

  • time_matrix: Массив времени между локациями в секундах;

  • num_vehicles: Количество ТС в автопарке;

  • starts: индексы узлов, в которых ТС начинают маршрут;

  • ends: индексы узлов, в которых ТС завершают маршрут;

  • pickups_deliveries: индексы узлов, для которых задается следующие ограничение — посадка должна происходить раньше высадки пассажира;

  • obligation: индексы ТС для каждого узла маршрута (для нового пассажира = None);

  • demands: изменение вместимости ТС при посещении узла;

  • vehicle_capacities: вместимость ТС;

  • point_types: информативный параметр, показывающий тип узла;

  • time_windows: интервал времени, в который нужно посетить определенный узел маршрута;

  • disjunction: индексы необязательных узлов, которые можно убрать из оптимизации для достижения сходимости решения;

  • initial_routes: маршруты ТС до оптимизации.

Python реализация

Начинаем написание нашего решения с вызова функции, которая создает data.

data = create_data_model()

Объявляем менеджер индексов (manager) и модель маршрутизации или солвер (routing).

manager = pywrapcp.RoutingIndexManager(
    len(data["time_matrix"]), data["num_vehicles"], data["starts"], data["ends"]
)
routing = pywrapcp.RoutingModel(manager)

Менеджер индексов необходим, для того перевести индексы созданные в data в индексы солвера. Менеджер преобразует индексы следующим образом:

  1. Узлы, в которых ТС начинают маршрут;

  2. Все другие узлы;

  3. Узлы, в которых ТС завершает маршрут.

Мы с вами специально расположили узлы так, чтобы индексы, созданные в data совпадали с индексами солвера.

Обратный вызов (callback)

Чтобы использовать солвер маршрутизации, нужно создать обратный вызов (callback) времени (или транзита) — функцию, которая принимает любую пару индексов узлов и возвращает время транзита между ними.
Следующая функция создает callback и регистрирует его в решателе как transit_callback_index. И устанавливаем одинаковую стоимость транзита для всех машин.

def time_callback(from_index, to_index):
    from_node = manager.IndexToNode(from_index)
    to_node = manager.IndexToNode(to_index)
    return data["time_matrix"][from_node, to_node]

transit_callback_index = routing.RegisterTransitCallback(time_callback)
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)

Добавим временное измерение:

Данные измерение необходимо для ограничения временных окон.

routing.AddDimension(
    evaluator_index=transit_callback_index,
    slack_max=8 * 60,  # allow waiting time
    capacity=8 * 60,  # maximum time per vehicle
    fix_start_cumul_to_zero=False,
    name="Time",
)
time_dimension = routing.GetDimensionOrDie("Time")

Рассмотрим аргументы AddDimension:

  • slack_max — разрешенное время прибытия в узел раньше левой границы временного окна;

  • capacity — максимальное допустимое значение времени

  • fix_start_cumul_to_zero — Если True стартовый узел обязан иметь значение 0.

Временные окна

Фича №1

Назовем это не баг, а фича. Для точек финиша ТС manager.NodeToIndex (ind) возвращает значение -1, а как вы уже понимаете узла с таким индексом нет. Поэтому из сложившейся ситуации можно выйти с помощью routing.End (obligations[ind]). Данный метод вернет индекс узла финиша соответствующего ТС.

def get_index(
    manager: RoutingIndexManager,
    ind: int,
    routing: RoutingModel,
    obligations: list[int | None],
) -> int:
    if manager.NodeToIndex(ind) != -1:
        index_node = manager.NodeToIndex(ind)
    else:
        index_node = routing.End(obligations[ind])
    return index_node

Теперь можно задать ограничение на временные окна:

Так задаются жесткие ограничения на временные окна. В таком случае, если хоть одно ограничение нарушается, то оптимизация не сходится. А в рамках динамической системы, это очень критично ввиду опозданий ТС.

def time_window_constr(
    data: dict[str, Any],
    manager: RoutingIndexManager,
    routing: RoutingModel,
    time_dimension: RoutingDimension,
) -> None:
    for ind, (pt, tw) in enumerate(data["time_windows"]):
        index = get_index(manager, ind, routing, data["obligations"])
        time_dimension.CumulVar(index).SetRange(tw[0], tw[1])

Фича №2

Поэтому мы с вами сделаем мягкие окна. Отличие от жестких заключается в том, что при нарушении ограничения в целевую функцию добавляется штраф за каждую единицу, на которую мы превысили пороговое значение.

def time_window_constr(
    data: dict[str, Any],
    manager: RoutingIndexManager,
    routing: RoutingModel,
    time_dimension: RoutingDimension,
    penalty: int = 10000
) -> None:
    for ind, (pt, tw) in enumerate(zip(data["point_types"], data["time_windows"])):
        index = get_index(manager, ind, routing, data["obligations"])
        if pt not in ("delivery", "finish"):
            time_dimension.SetCumulVarSoftLowerBound(index, tw[0], int(10e6))
        time_dimension.SetCumulVarSoftUpperBound(index, tw[1], penalty * 2)

Фича №3

Солвер в библиотеке ortools не обязан попасть в каждый узел как можно ближе к левой границе временного окна. Он лишь обязуется не выходить за рамки временного окна. Поэтому нам нужно добавить следующий штраф:

span_coef = 100
time_dimension.SetSpanCostCoefficientForAllVehicles(span_coef)

span_cost = span_coef * (dimension end value — dimension start value)

Таким образом, span_cost будет добавляться в целевую функцию и солвер будет посещать узлы как можно ближе к левой границе.

Ограничение для необязательных узлов

Что делать, если из-за нового пассажира не сходится оптимизация? Правильно, его нужно убрать из оптимизации. Это можно сделать с помощью AddDisjunction.

def disjunction_constr(
    data: dict[str, Any],
    manager: RoutingIndexManager,
    routing: RoutingModel,
    penalty: int = 10000
) -> None:
    for pickup, delivery in data["disjunction"]:
        node_ids = [manager.NodeToIndex(pickup), manager.NodeToIndex(delivery)]
        routing.AddDisjunction(node_ids, penalty, 2)

В AddDisjunction — последний аргумент означает, что в оптимизацию включаются либо оба узла либо ни один из них. Однако за невключение в маршрут этих узлов в целевую функцию добавляется штраф равный penalty * кол-во выкинутых узлов. Это отлично работает с мягкими окнами, так как у них за превышение границы окна на одну единицу накладывается точно такой же штраф. Таким образом оптимизатор включит в маршрут новый заказ только в том случае, если это не нарушает ограничение на временные окна.

Ограничение AddPickupAndDelivery

def pickup_delivery_constr(
    data: dict[str, Any],
    manager: RoutingIndexManager,
    routing: RoutingModel,
    time_dimension: RoutingDimension,
) -> None:
    for pickup, delivery in data["pickups_deliveries"]:
        pickup_index = get_index(manager, pickup, routing, data["obligations"])
        delivery_index = get_index(manager, delivery, routing, data["obligations"])
        # Ускоряет оптимизацию
        routing.AddPickupAndDelivery(pickup_index, delivery_index)
        # pickup и delivery на одной машине и активны
        routing.solver().Add(
            routing.ActiveVar(pickup_index) * routing.VehicleVar(pickup_index)
            == routing.ActiveVar(delivery_index) * routing.VehicleVar(delivery_index)
        )
        # pickup меньше или равен delivery по времени
        routing.solver().Add(
            time_dimension.CumulVar(pickup_index)
            <= time_dimension.CumulVar(delivery_index)
        )

Ограничение на вместимость ТС

def capacity_constr(data: dict, manager: RoutingIndexManager, routing: RoutingModel) -> None:
    def demand_callback(from_index):
        from_node = manager.IndexToNode(from_index)
        return data["demands"][from_node]

    demand_callback_index = routing.RegisterUnaryTransitCallback(demand_callback)
    routing.AddDimensionWithVehicleCapacity(
        evaluator_index=demand_callback_index,
        slack_max=0,  # null capacity slack
        vehicle_capacities=data["vehicle_capacities"],  # vehicle maximum capacities
        fix_start_cumul_to_zero=True,  # start cumul to zero
        name="Capacity",
    )

Создаем callback на вместимость. Напомним вам как он выглядят данные для данного ограничения:

data["demands"] = [
    0, 0, 3, 3, -1, -1, -1, -1, -1, -1, 1, -1, 1, -1, 1, -1, 0, 0
]
data["point_types"] = [
    "start", "start", "onboard", "onboard", "delivery", "delivery",
    "delivery", "delivery", "delivery", "delivery", "pickup", "delivery",
    "pickup", "delivery", "pickup", "delivery", "finish", "finish",
]
data["vehicle_capacities"] = [8, 8]

Фича №4

Обратите внимание, на то, что у нас существуют 2 узла с типом «onboard». Для чего они нужны? Снова не баг, а фича.

Нам нужно учесть в ограничении вместимости тех пассажиров, которые уже находятся в ТС. Если вы присвоите, данный demand к стартовым узлам, то оптимизатор не будет учитывать высадку пассажиров (demand = -1). Однако введение фиктивных узлов, на которые присваивается demand, помогает решить данную проблему, и дает нам возможность корректно планировать маршруты.

Закрепление узлов за конкретным ТС

Итак, осталось написать последнее ограничение, которое закрепляет соответствующий узел за конкретным ТС.

def obligation_constr(
    data: dict[str, Any],
    manager: RoutingIndexManager,
    routing: RoutingModel
) -> None:
    for node_idx, obligation in enumerate(data["obligations"]):
        index = get_index(manager, node_idx, routing, data["obligations"])
        if obligation is not None:
            routing.VehicleVar(index).SetValue(obligation)

Для чего это может быть нужно? Например, чтобы закрепить узлы пассажиров, которые непосредственно находятся в ТС. Правда, в нашем случае мы закрепляем узлы всех пассажиров за каким-либо ТС, кроме нового.

Параметры поиска

Объявим параметры поиска:

  • 8 (PARALLEL_CHEAPEST_INSERTION) стратегия для поиска начального решения

  • 2 (GUIDEDLOCALSEARCH) стратегия для улучшения начального решения

  • 500 итераций

  • 3 секунды на оптимизацию

def get_search_params() -> DefaultRoutingSearchParameters:
    search_parameters = pywrapcp.DefaultRoutingSearchParameters()
    search_parameters.first_solution_strategy = 8
    search_parameters.local_search_metaheuristic = 2
    search_parameters.solution_limit = 500
    search_parameters.time_limit.seconds = 3
    return search_parameters

Посмотреть весь остальной список стратегий начального решения и последующего улучшения начального решения можно тут.

Применяем все объявленные параметры для поиска решения

routing.CloseModelWithParameters(search_parameters)

Начальное решение

Как правило для ускорения оптимизации можно добавить изначальное решение без новых пассажиров.

initial_solution = routing.ReadAssignmentFromRoutes(
    data["initial_routes"], True
)

Добавляем в солвер параметры поиска и начальное решение.

Фунцкия для воспроизведения решения

def print_solution(data, manager, routing, solution):
    print(f"Objective: {solution.ObjectiveValue()}")
    max_route_distance = 0
    for vehicle_id in range(data["num_vehicles"]):
        index = routing.Start(vehicle_id)
        plan_output = f"Route for vehicle {vehicle_id}:\n"
        route_distance = 0
        while not routing.IsEnd(index):
            plan_output += f" {manager.IndexToNode(index)} -> "
            index = solution.Value(routing.NextVar(index))
            route_distance += assignment.Min(time_dimension.CumulVar(index))
        plan_output += f"{manager.IndexToNode(index)}\n"
        plan_output += f"Time of the route: {route_distance} minutes \n"
        print(plan_output)
        max_route_distance = max(route_distance, max_route_distance)
    print(f"Maximum of the route: {max_route_distance} minutes")

Итог

Полный код

from typing import Any

from ortools.constraint_solver import pywrapcp
from ortools.constraint_solver.pywrapcp import (
DefaultRoutingSearchParameters, RoutingDimension, RoutingIndexManager,
RoutingModel,
)


def create_data_model ():
data = {}
data[«time_matrix»] = [
[0, 6, 0, 6, 5, 6, 7, 6, 8, 2, 9, 2, 4, 9, 9, 4, 9, 9],
[6, 0, 6, 0, 2, 2, 2, 1, 3, 3, 4, 3, 7, 11, 4, 4, 12, 12],
[0, 6, 0, 6, 5, 6, 7, 6, 8, 2, 9, 2, 4, 9, 9, 4, 9, 9],
[6, 0, 6, 0, 2, 2, 2, 1, 3, 3, 4, 3, 7, 11, 4, 4, 12, 12],
[6, 0, 6, 0, 0, 2, 3, 2, 4, 4, 5, 3, 7, 10, 5, 3, 12, 12],
[9, 4, 9, 4, 3, 0, 1, 2, 1, 6, 3, 6, 9, 12, 3, 6, 14, 14],
[8, 4, 8, 4, 3, 2, 0, 1, 2, 5, 2, 5, 9, 12, 2, 5, 14, 14],
[7, 2, 7, 2, 2, 0, 1, 0, 1, 4, 3, 4, 8, 11, 3, 4, 13, 13],
[9, 4, 9, 4, 4, 2, 0, 2, 0, 6, 2, 6, 10, 13, 2, 6, 14, 14],
[2, 5, 2, 5, 5, 4, 5, 4, 5, 0, 7, 3, 3, 10, 7, 6, 11, 11],
[8, 4, 8, 4, 3, 2, 0, 1, 2, 5, 0, 5, 9, 12, 0, 5, 14, 14],
[3, 4, 3, 4, 4, 4, 5, 4, 6, 2, 7, 0, 5, 9, 7, 3, 10, 10],
[1, 7, 1, 7, 6, 6, 7, 6, 7, 2, 9, 4, 0, 9, 9, 5, 9, 9],
[10, 11, 10, 11, 11, 13, 14, 13, 14, 10, 16, 10, 11, 0, 16, 9, 6, 6],
[8, 4, 8, 4, 3, 2, 0, 1, 2, 5, 0, 5, 9, 12, 0, 5, 14, 14],
[7, 5, 7, 5, 4, 6, 6, 5, 7, 5, 8, 4, 9, 9, 8, 0, 10, 10],
[10, 12, 10, 12, 12, 14, 15, 14, 14, 10, 17, 10, 12, 4, 17, 10, 0, 0],
[10, 12, 10, 12, 12, 14, 15, 14, 14, 10, 17, 10, 12, 4, 17, 10, 0, 0],
]
data[«num_vehicles»] = 2
data[«starts»] = [0, 1]
data[«ends»] = [16, 17]
data[«pickups_deliveries»] = [(10, 11), (12, 13), (14, 15)]
data[«obligations»] = [
0, 1, 0, 1, 0, 0, 0, 1, 1, 1, 0, 0, 1, 1, None, None, 0, 1
]
data[«demands»] = [
0, 0, 3, 3, -1, -1, -1, -1, -1, -1, 1, -1, 1, -1, 1, -1, 0, 0
]
data[«vehicle_capacities»] = [8, 8]
data[«point_types»] = [
«start», «start», «onboard», «onboard», «delivery», «delivery»,
«delivery», «delivery», «delivery», «delivery», «pickup», «delivery»,
«pickup», «delivery», «pickup», «delivery», «finish», «finish»,
]
data[«time_windows»] = [
(0, 0), (0, 0), (0, 0), (0, 0), (0, 9), (0, 11), (0, 11), (0, 4),
(0, 4), (0, 10), (8, 15), (0, 23), (9, 17), (0, 32), (4, 30), (9, 45),
(0, 79), (0, 119),
]
data[«disjunction»] = [(14, 15)]
data[«initial_routes»] = [[2, 4, 6, 5, 10, 11], [3, 8, 7, 9, 12, 13]]
return data

def add_time_dimension (
data: dict[str, Any], manager: RoutingIndexManager, routing: RoutingModel
) → RoutingDimension:
def time_callback (from_index, to_index):
from_node = manager.IndexToNode (from_index)
to_node = manager.IndexToNode (to_index)
return data[«time_matrix»][from_node, to_node]

transit_callback_index = routing.RegisterTransitCallback (time_callback)
routing.AddDimension (
evaluator_index=transit_callback_index,
slack_max=8×60, # allow waiting time
capacity=8×60, # maximum time per vehicle
fix_start_cumul_to_zero=False, # force start cumul to zero
name=«Time»,
)
time_dimension = routing.GetDimensionOrDie («Time»)
routing.SetArcCostEvaluatorOfAllVehicles (transit_callback_index)
time_dimension.SetSpanCostCoefficientForAllVehicles (100)
return time_dimension

def get_index (
manager: RoutingIndexManager,
ind: int,
routing: RoutingModel,
obligations: list[int | None],
) → int:
if manager.NodeToIndex (ind) != -1:
index_node = manager.NodeToIndex (ind)
else:
index_node = routing.End (obligations[ind])
return index_node

def time_window_constr (
data: dict[str, Any],
manager: RoutingIndexManager,
routing: RoutingModel,
time_dimension: RoutingDimension,
penalty: int = 10000
) → None:
for ind, (pt, tw) in enumerate (zip (data[«point_types»], data[«time_windows»])):
index = get_index (manager, ind, routing, data[«obligations»])
if pt not in («delivery», «finish»):
time_dimension.SetCumulVarSoftLowerBound (index, tw[0], int (10e6))
time_dimension.SetCumulVarSoftUpperBound (index, tw[1], penalty * 2)

def disjunction_constr (
data: dict[str, Any],
manager: RoutingIndexManager,
routing: RoutingModel,
penalty: int = 10000
) → None:
for pickup, delivery in data[«disjunction»]:
node_ids = [manager.NodeToIndex (pickup), manager.NodeToIndex (delivery)]
routing.AddDisjunction (node_ids, penalty, 2)


def pickup_delivery_constr (
data: dict[str, Any],
manager: RoutingIndexManager,
routing: RoutingModel,
time_dimension: RoutingDimension,
) → None:
for pickup, delivery in data[«pickups_deliveries»]:
pickup_index = get_index (manager, pickup, routing, data[«obligations»])
delivery_index = get_index (manager, delivery, routing, data[«obligations»])
# Ускоряет оптимизацию
routing.AddPickupAndDelivery (pickup_index, delivery_index)
# pickup и delivery на одной машине и активны
routing.solver ().Add (
routing.ActiveVar (pickup_index) * routing.VehicleVar (pickup_index)
== routing.ActiveVar (delivery_index) * routing.VehicleVar (delivery_index)
)
# pickup меньше или равен delivery по времени
routing.solver ().Add (
time_dimension.CumulVar (pickup_index)
<= time_dimension.CumulVar(delivery_index)
)

def capacity_constr (
data: dict[str, Any],
manager: RoutingIndexManager,
routing: RoutingModel
) → None:
def demand_callback (from_index):
from_node = manager.IndexToNode (from_index)
return data[«demands»][from_node]

demand_callback_index = routing.RegisterUnaryTransitCallback (demand_callback)
routing.AddDimensionWithVehicleCapacity (
evaluator_index=demand_callback_index,
slack_max=0, # null capacity slack
vehicle_capacities=data[«vehicle_capacities»], # vehicle maximum capacities
fix_start_cumul_to_zero=True, # start cumul to zero
name=«Capacity»,
)


def obligation_constr (
data: dict[str, Any],
manager: RoutingIndexManager,
routing: RoutingModel
) → None:
for node_idx, obligation in enumerate (data[«obligations»]):
index = get_index (manager, node_idx, routing, data[«obligations»])
if obligation is not None:
routing.VehicleVar (index).SetValue (obligation)


def get_search_params () → DefaultRoutingSearchParameters:
search_parameters = pywrapcp.DefaultRoutingSearchParameters ()
search_parameters.first_solution_strategy = 8
search_parameters.local_search_metaheuristic = 2
search_parameters.solution_limit = 500
search_parameters.time_limit.seconds = 3
return search_parameters


def print_solution (data, manager, routing, solution):
print (f«Objective: {solution.ObjectiveValue ()}»)
max_route_distance = 0
for vehicle_id in range (data[«num_vehicles»]):
index = routing.Start (vehicle_id)
plan_output = f«Route for vehicle {vehicle_id}:\n»
route_distance = 0
while not routing.IsEnd (index):
plan_output += f» {manager.IndexToNode (index)} → »
index = solution.Value (routing.NextVar (index))
route_distance += assignment.Min (time_dimension.CumulVar (index))
plan_output += f»{manager.IndexToNode (index)}\n»
plan_output += f«Time of the route: {route_distance} minutes \n»
print (plan_output)
max_route_distance = max (route_distance, max_route_distance)
print (f«Maximum of the route: {max_route_distance} minutes»)

if __name__ == »__main__»:
data = create_data_model ()
manager = pywrapcp.RoutingIndexManager (
len (data[«time_matrix»]), data[«num_vehicles»], data[«starts»], data[«ends»]
)
routing = pywrapcp.RoutingModel (manager)

# constraints
time_dimension = add_time_dimension (data=data, manager=manager, routing=routing)
time_window_constr (
data=data,
manager=manager,
routing=routing,
time_dimension=time_dimension,
)

pickup_delivery_constr (
data=data, manager=manager, routing=routing, time_dimension=time_dimension
)
capacity_constr (data=data, manager=manager, routing=routing)
obligation_constr (data=data, manager=manager, routing=routing)
disjunction_constr (data=data, manager=manager, routing=routing)

# solutions
search_parameters = get_search_params ()
# Применяем все объявленные параметры для поиска решения
routing.CloseModelWithParameters (search_parameters)
initial_solution = routing.ReadAssignmentFromRoutes (
data[«initial_routes»], True
)
assignment = routing.SolveFromAssignmentWithParameters (
initial_solution, search_parameters
)
print_solution (data, manager, routing, assignment)

Objective: 1700
Route for vehicle 0:
 0 ->  2 ->  14 ->  15 ->  4 ->  6 ->  5 ->  10 ->  11 -> 16
Time of the route: 52 minutes 

Route for vehicle 1:
 1 ->  3 ->  8 ->  7 ->  9 ->  12 ->  13 -> 17
Time of the route: 27 minutes 

Maximum of the route: 52 minutes

Как итог, можно отметить, что новая заказ с индексами узлов 14 и 15 добавился в 0 ТС.


На этом у меня все, спасибо, что дочитали до этого момента. Надеюсь, материал оказался полезным.

Ссылки:

  1. Официальная документация ortools

  2. Git ortools

© Habrahabr.ru