Going NATS
Все инновации так или иначе будут связаны с глобальными, распределенными системами в которых ключевую роль будут играть периферийные вычисления.
Многие из инструментов, которые мы успешно используем для решения текущих задач, могут быть неадекватными для новых вызовов.
В качестве примера таких инструментов, можно привести Ansible и SSH. Ansible, как известно использует SSH в качестве основного транспорта для выполнения команд на удаленных машинах. Это своего рода RPC для системных администраторов, который для управления конфигурациями использует YAML.
Ansible отлично подходит для управления парком из нескольких сотен серверов в дата-центре, но любой, кто пытался использовать его для управления тысячами машин, обязательно сталкивался с трудностями масштабирования.
В мире где работают десятки тысяч микросервисов, периферийных устройств и функций, возможности Ansible неадекватны и это стало одной из причин появления таких проектов как Kubernetes.
Но для Edge AI и Agentic AI нужны новые подходы.
NATS это система для обмена сообщений, которая, на мой взгляд отлично подходит для решения такого рода задач.
Например NATS может стать для основой для централизованной системы управления серверами и периферийными устройствами.
Для тех кто знаком не нужно будет объяснять принципы работы pub/sub. Архитектурно NATS похож на Kafka и RabbitMQ. Основное отличие производительность и простота в использовании, которая делает NATS идеальным выбором именно для периферийных вычислений.
Ниже приведен очень простой пример RPC для LLM на удаленной машине.
Для установки NATS:
curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@v2.10.20 | sh
Запускаем сервер NATS:
./nats-server -m 8222 &
Клиентской части добавлен простой UI который запускается на http://localhost:8080:
Command Publisher
Subscriber:
package main
import (
"fmt"
"os/exec"
"runtime"
"github.com/nats-io/nats.go"
)
func executeCommand(command string) (string, error) {
var cmd *exec.Cmd
if runtime.GOOS == "windows" {
cmd = exec.Command("cmd", "/C", command)
} else {
cmd = exec.Command("sh", "-c", command)
}
output, err := cmd.CombinedOutput()
if err != nil {
return "", fmt.Errorf("error executing command: %v, output: %s", err, string(output))
}
return string(output), nil
}
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
fmt.Println("Error connecting to NATS:", err)
return
}
defer nc.Close()
_, err = nc.Subscribe("commands", func(msg *nats.Msg) {
command := string(msg.Data)
fmt.Printf("Received command: %s\n", command)
output, err := executeCommand(command)
if err != nil {
fmt.Println("Error:", err)
nc.Publish(msg.Reply, []byte(fmt.Sprintf("Error: %v", err)))
return
}
nc.Publish(msg.Reply, []byte(output))
fmt.Printf("Command executed successfully: %s\n", output)
})
if err != nil {
fmt.Println("Error subscribing to subject:", err)
return
}
fmt.Println("Remote host is listening for commands...")
select {}
}
и
Publisher:
package main
import (
"fmt"
"html/template"
"log"
"net/http"
"time"
"github.com/nats-io/nats.go"
)
const natsServer = "nats://localhost:4222"
const htmlTemplate = `
Command Publisher
Command Publisher
{{if .Response}}
Response:
{{.Response}}
{{end}}
`
type PageData struct {
Response string
}
func main () {
nc, err:= nats.Connect (natsServer)
if err!= nil {
log.Fatalf («Error connecting to NATS: %v», err)
}
defer nc.Close ()
http.HandleFunc (»/», func (w http.ResponseWriter, r *http.Request) {
tmpl:= template.Must (template.New («index»).Parse (htmlTemplate))
data:= PageData{Response:»}
tmpl.Execute (w, data)
})
http.HandleFunc (»/publish», func (w http.ResponseWriter, r *http.Request) {
if err:= r.ParseForm (); err!= nil {
http.Error (w, «Error parsing form», http.StatusBadRequest)
return
}
command:= r.FormValue («command»)
if command == » {
http.Error (w, «Command cannot be empty», http.StatusBadRequest)
return
}
msg, err:= nc.Request («commands», []byte (command), 5*time.Second)
if err!= nil {
http.Error (w, fmt.Sprintf («Error sending command: %v», err), http.StatusInternalServerError)
return
}
tmpl:= template.Must (template.New («index»).Parse (htmlTemplate))
data:= PageData{Response: string (msg.Data)}
tmpl.Execute (w, data)
})
fmt.Println («Web UI is running on http://localhost:8080»)
log.Fatal (http.ListenAndServe (»:8080», nil))
}
Агент AI — это просто программа которая может выполнятся на каком-то устройстве. Несложно представить, что он может использовать инструкции пользователя для управления переферийным устройством: камера, манипулятор, он может самостоятельно отслеживать метрики. Но в данном случае это просто LLM которому можно так же как и в обычном Ansible плейбуке передать команды в YAML файле.
Manifest
Только для этого нужно будет добавить YAML парсер в клиентскую часть.
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/nats-io/nats.go"
"gopkg.in/yaml.v3"
)
const natsServer = "nats://localhost:4222"
type Task struct {
Name string `yaml:"name"`
Command string `yaml:"command"`
}
type Manifest struct {
Tasks []Task `yaml:"tasks"`
}
func main() {
nc, err := nats.Connect(natsServer)
if err != nil {
log.Fatalf("Error connecting to NATS: %v", err)
}
defer nc.Close()
yamlFile, err := os.ReadFile("manifest.yaml")
if err != nil {
log.Fatalf("Error reading YAML file: %v", err)
}
var manifest Manifest
err = yaml.Unmarshal(yamlFile, &manifest)
if err != nil {
log.Fatalf("Error parsing YAML file: %v", err)
}
for _, task := range manifest.Tasks {
fmt.Printf("Executing task: %s\n", task.Name)
fmt.Printf("Command: %s\n", task.Command)
msg, err := nc.Request("commands", []byte(task.Command), 5*time.Second)
if err != nil {
if err == nats.ErrTimeout {
fmt.Println("Error: Request timed out. The subscriber did not respond in time.")
} else {
fmt.Println("Error sending command:", err)
}
continue
}
fmt.Printf("Response from remote host:\n%s\n", string(msg.Data))
fmt.Println("----------------------------------------")
}
}
client.go
«Будущее уже наступило, просто оно еще неравномерно распределено». Но для того, чтобы соответствовать масштабу задач, нам понадобятся новые инструменты.