Простой DICOM клиент на GO с балансировщиком задач и веб-интерфейсом

7a6072855a2a468fb6560fb42ad0b478.jpegПривет Хабр! В последнее время я очень сильно увлекся разработкой на языке GO. Изящный и выразительный язык программирования. Мне давно хотелось сделать что-нибудь полезное. По специфике своей работы мне приходится работать с медицинскими архивами DICOM-изображений PACS.Я решил, что пришло время создать свой dicom-клиент с (блэкджеком…) веб-интерфейсом, который может выполнять следующие стандартные операции: Dicom ping; Cкачивание исследований; Загрузка исследования, А также поиск по реквизитам (c-echo, c-move, c-store, c-find соотвествено).В качестве dicom-библиотеки была выбрана библиотека GrassRoot SDK. Наш клиент будет распараллеливать задачи. Язык go для этого хорошо адаптированПохожий сценарий работы был описан habrahabr.ru/post/198150/.Наш сценарий несколько отличается: У нас есть некий балансировщик задач, который получает задания dicom-сервиса, проверяет возможность выполнения и асинхронно их выполняет. Для того чтобы не было ситуации, когда параллельно выполняется 1000 задач, мы реализуем очередь задач таким образом, чтобы были активные задачи и те, которые находятся в спящем состоянии. По умолчанию только 10 задач будут активными. В противном случае мы могли бы обойтись без баллансировщика вообще, тупо параллельно выполнить 1000 задач параллельно без какого либо контроля.Весь код балансировщика находится в файле job_ballancer.go.

В начале идет описание интерфейсов обработчиков. В случае если работа была выполнена успешно, в случае если вернулась ошибка и сам процесс обработки задачи.

type JobDispatcher interface { Dispatch (interface{}) (interface{}, error) } type ErrDispatcher interface { DispatchError (FaJob) error } type CompDispatcher interface { DispatchSuccess (CompJob) error } Когда мы создаем экземпляр диспетчера, мы его инициализируем соответствующими обработчиками.

srv.jbBal.Init (&srv.dDisp, srv, srv)

//Сама структура балансировщика

type JobBallancer struct { jChan chan interface{} //канал в который мы передаем задания acJob map[string]Job //список активных работ slJob map[string]Job //список неактивных работ errDisp ErrDispatcher //обработчик работ завершившихся error jobDisp JobDispatcher //обработчик заданий compDisp CompDispatcher //обработчик успешно выполненных работ JbDone sync.WaitGroup //ожидаем завершения всех работ aJobC int //количество параллельных (активных) }

//инициализация балансировщика func (jbal *JobBallancer) Init (jdis JobDispatcher, cmd CompDispatcher, erd ErrDispatcher) { jbal.errDisp = erd jbal.jobDisp = jdis jbal.compDisp = cmd jbal.acJob = make (map[string]Job) jbal.slJob = make (map[string]Job) jbal.aJobC = 10 jbal.jChan = make (chan interface{}) go jbal.takeJob () //запускаем поток в котором осуществляем //балансировку работ log.Println («info: job ballancer inited») } добавление новой работы в очередь. Операция асинхронная, т.е. работа отсылается в канал, а функция takeJob подбирает ее от туда. func (jbal *JobBallancer) PushJob (jdat interface{}) error { if jbal.checkInit () { return errors.New («error: JobChan is not inited») } uid:= genUid () job:= Job{JobId: uid, Data: jdat} jbal.jChan <- job return nil

}

func (jbal *JobBallancer) takeJob () { for { //извлекаем работу из канала recivedTask:= <-jbal.jChan log.Println("info: job taken") switch job := recivedTask.(type) { case TermJob: //если мы получаем сигнал на завершение выходим из функции log.Println("info: recive terminate dispatch singal") return case Job: //обычная обработка (если все слоты активных работ заняты то работа записывается в список не активных работ) if len(jbal.acJob) < jbal.aJobC { jbal.JbDone.Add(1) jbal.addActiveJob(job) go jbal.startJob(job) log.Println("info: normal dispatch") } else { jbal.addSleepJob(job) jbal.JbDone.Add(1) log.Println("info: attend maximum active job") } case CompJob: //работа завершилась успехом if err := jbal.compDisp.DispatchSuccess(job); err != nil { log.Println("error: failed dispatch success" + job.Job.JobId) } //удачно завершившуюся работу можно удалить из списка jbal.removeJob(job.Job.JobId) jbal.JbDone.Done() jbal.resumeJobs() case FaJob: //работа завершилась ошибкой if err := jbal.errDisp.DispatchError(job); err != nil { log.Println("error: failed dispatch error" + job.Job.JobId) } //завершившуюся работу можно удалить из списка jbal.removeJob(job.Job.JobId) jbal.JbDone.Done() jbal.resumeJobs() default: log.Fatalln("error: unknown job type") jbal.JbDone.Done() } } } //функция удаления работы func (jbal *JobBallancer) removeJob(jid string) error { if _, isFind := jbal.acJob[jid]; isFind { delete(jbal.acJob, jid) } else { return errors.New("error: can't remove job because job with id not found") } return nil }

//функция позволяющая правильно завершить работу балансировщика, в случае если есть работы которые не завершены, функция будет ожидать их завершения func (jbal *JobBallancer) TerminateTakeJob () error { if jbal.checkInit () { return errors.New («error: is not inited») } jbal.JbDone.Wait () jbal.jChan <- TermJob{} close(jbal.jChan) if len(jbal.acJob) > 0 { return errors.New («error: list job is not empty») } log.Println («info: greacefully terminate take job») return nil } Остальные вспомогательные функции мы не будем рассматривать. полный код можно посмотретьgithub.com/Loafter/dtools/blob/master/dcmjsser/job_ballancer.go

Не смотря, что код не сложный и я долго обдумывал его. Но все равно для проверки надежности я реализовал нагрузочный тест на десятки задач:

testJobDispatcher:= TestJobDispatcher{} testErrorDispatcher:= TestErrorDispatcher{} testSuccessDispatcher:= TestCompletedDispatcher{} jobBallancer:= JobBallancer{} jobBallancer.Init (&testJobDispatcher, &testSuccessDispatcher, &testErrorDispatcher) for i:= 0; i < 40; i++ { jobBallancer.PushJob("data: " + strconv.Itoa(i)) } jobBallancer.TerminateTakeJob() Он отработал нормально. Все задачи были выполнены, а функция TerminateTakeJob завершилась тогда, когда все задачи были выполненны. Для контроля отработаных задач используется объект синхронизации sync.WaitGroup JbDone, который ведет подсчет количества выполненных работ. Как я уже отмечал выше, код балансировщика является универсальным и для того чтобы наш балансировщик работал по-другому, нам достаточно проинстанцировать его соотвествующими обработчиками.Как и в прошлой своей поделке) (http://habrahabr.ru/post/247727/) интерефейс приложения я реализовал в виде веб-интерфейса.15ca5fda32964d42a9360af3abe3fa19.png

Для теста я использовал публичный dicom-архив 213.165.94.158:11112. С него можно скачивать исследования, если есть прямой айпи и если на стороне клиента открыт порт 11112. Так же я проверил работу на свободном dicom-архиве dcm4che sourceforge.net/projects/cdmedicpacsweb/files/latest/download? source=files.Мне удалось собрать рабочую версию для Linux, к сожалению собрать под Widows мне не удалось. Библиотека grassroot успешно собралась, но ошибка возникает при линковке самого приложения.cmd/ld: Malformed PE file: Unexpected flags for PE section.

Об этой ошибке много написано тут: github.com/golang/go/issues/4069.К сожалению я не настолько знаком с тонкостями сборки и поэтому получилась версия только под Linux. Может «хабра-эфект» сдвинет с мертвой точки и эту проблему. Для пользователей Windows, которые хотят проверить и посмотреть как это работает, я подготовил виртуальную машину на базе CoreOS (https://yadi.sk/d/y81KC-tyfar6A). В демо машине наш dicom-клиент работает как systemd-сервис.При наличии желания, можно например реализовать сервис, который выкачивает исследования с различных dicom-узлов, и выкладывает в zip-архиве для скачивания. Для управления сервисом можно использовать json-сообщения, так же, как делает наш GUI.А можно поступить, как поступил я: прикрутить в наше приложение какой-нибудь веб-просмотровщик на базе html5.

7d4994d686974d72bb8045a784e5227b.pngGithub: github.com/Loafter/dtoolsВерсия Linux-amd64: github.com/Loafter/dtools/releases/download/1.0/dcmjsser

© Habrahabr.ru