Разработка менеджера закачек на GO
Менеджер многопоточных закачек на GO.
http://loafter.github.io/godownloader/
Вступление
Давным-давно, в году этак 1998, для выхода в интернет я использовал модем на работе у отца. Он его включал вечером после работы и я мог наслаждаться просторами сети интернет на скорости аж 31.2 кбит/c. В то время не было истеричных блогеров, страницы не весили по мегабайту, а в новостных сайтах говорили только правду. Естественно основной интерес представляли ресурсы. Картинки, программы, всякие дополнения к играм, вроде машинок. Как сейчас помню качать через IE было сущим адом. Скачать файл весом больше 500 кб было просто невозможно, древний осел был намного упрямей.
В то время было очень много всяких менеджеров закачек вроде Getright, Go!zilla, Download Accelerator и, конечно, FlashGet. В то время 90% процентов из них были перегружено рекламным говном, лучшим был FlashGet. Он умел бить на части скачиваемый файл и работал шустро. Как сейчас помню версия 1.7 последняя была. Эту версию я в те времена и использовал.
Прошло 15 лет и мне понадобилось скачать большой объем данных через vpn из-за океана.
И что же изменилось за 15 лет?
Да ничего. По прежнему существуют все те же менеджеры с минимальными изменениями. Даже flashget оставил версию 1.7 вместе с новомодной 3.хх.
После того как wxfast download не сумел скачать 50 гигобайтный файл, я решил попробовать написать свой менеджер закачек, который включал бы в себя множество многопоточных заданий с возможностью контроля степени выполнения, остановки в любой момент, а также сохранения состояния загрузок между запусками приложения. Все это отличный вызов для языка GO.
Обертка
С чего начать? Первое что нам необходимо это уметь останавливать закачку и получать информацию о ходе скачивания в любой момент времени. В GO есть легковесные потоки, которые можно использовать у нас в программе. То есть как минимум один поток будет у нас для скачивания, другой для управления процессом (остановка, старт закачки, получение информации о ходе выполнения). Если с процессом получения сведений о загруженных данных проблем не возникает (мы можем получать их по ссылке сквозь потоки), то с процессом останоки закачки все немного сложнее, мы не можем остановить или убить другую goroutine. Но мы можем послать ей сигнал на выход из потока. Собственно так мы и сделаем. Реализуем простую обертку, которая бы позволила создавать произвольные дискретные работы с возможностью паузы и получения информации о состоянии работы.
Для того чтобы обернуть какую-либо структуру необходимо чтобы она подерживала следующий интерфейс:
type DiscretWork interface {
DoWork() (bool, error)
GetProgress() interface{}
BeforeRun() error
AfterStop() error
}
Сама обертка:
func (mw *MonitoredWorker) wgoroute() {
log.Println("info: work start", mw.GetId())
mw.wgrun.Add(1)
defer func() {
log.Print("info: release work guid ", mw.GetId())
mw.wgrun.Done()
}()
for {
select {
case newState := <-mw.chsig:
if newState == Stopped {
mw.state = newState
log.Println("info: work stopped")
return
}
default:
{
isdone, err := mw.Itw.DoWork()
if err != nil {
log.Println("error: guid", mw.guid, " work failed", err)
mw.state = Failed
return
}
if isdone {
mw.state = Completed
log.Println("info: work done")
return
}
}
}
}
}
func (mw *MonitoredWorker) Start() error {
mw.lc.Lock()
defer mw.lc.Unlock()
if mw.state == Completed {
return errors.New("error: try run completed job")
}
if mw.state == Running {
return errors.New("error: try run runing job")
}
if err := mw.Itw.BeforeRun(); err != nil {
mw.state = Failed
return err
}
mw.chsig = make(chan int, 1)
mw.state = Running
go mw.wgoroute()
return nil
}
После запуска потока, wgoroute(), в цикле функция пошагово выполняет каждую итерацию вызовом метода DoWork(), В случае, если во время выполнения работы произошла ошибка, функция выходит из цикла и завершает поток. Также в цикле осуществляется выборка из канала.
select {
case newState := <-mw.chsig:
if newState == Stopped {
mw.state = newState
log.Println("info: work stopped")
return
}
Если поступило сообщение Stopped, алгоритм выходит из потока и устанавливает соотвествующее состояние.
Воспользуемся встроенными в язык средствами тестирования для проверки работы обертки:
package dtest
import (
"errors"
"fmt"
"godownloader/monitor"
"log"
"math/rand"
"testing"
"time"
)
type TestWorkPool struct {
From, id, To int32
}
func (tw TestWorkPool) GetProgress() interface{} {
return tw.From
}
func (tw *TestWorkPool) BeforeRun() error {
log.Println("info: exec before run")
return nil
}
func (tw *TestWorkPool) AfterStop() error {
log.Println("info: after stop")
return nil
}
func (tw *TestWorkPool) DoWork() (bool, error) {
time.Sleep(time.Millisecond * 300)
tw.From += 1
log.Print(tw.From)
if tw.From == tw.To {
fmt.Println("done")
return true, nil
}
if tw.From > tw.To {
return false, errors.New("tw.From > tw.To")
}
return false, nil
}
func TestWorkerPool(t *testing.T) {
wp := monitor.WorkerPool{}
for i := 0; i < 20; i++ {
mw := &monitor.MonitoredWorker{Itw: &TestWorkPool{From: 0, To: 20, id: rand.Int31()}}
wp.AppendWork(mw)
}
wp.StartAll()
time.Sleep(time.Second)
log.Println("------------------Work Started------------------")
log.Println(wp.GetAllProgress())
log.Println("------------------Get All Progress--------------")
time.Sleep(time.Second)
wp.StopAll()
log.Println("------------------Work Stop-------------------")
time.Sleep(time.Second)
wp.StartAll()
time.Sleep(time.Second * 5)
wp.StopAll()
wp.StartAll()
wp.StopAll()
}
Загрузка данных
После того как мы создали рабочую обертку для задач, мы можем приступить к основной функции — загрузки данных по http. Наверное основная проблема протокола http — это низкая скорость при загрузки данных в один поток. Именно поэтому, когда интернет был медленный, появилось столько менеджеров закачек, которые умели разбивать загружаемый файл на фрагменты и загружать их в несколько http-соединений, за счет этого достигался выйгрыш в скорости. Естественно наш менеджер закачек не исключение, он также должен уметь забирать файл в несколько потоков. Для нормальной работы данной схемы необходимо чтобы сервер поддерживал докачку.
Со стороны клиента должен быть сформирован запрос у которого есть в заголовке поле Range. Весь запрос в сыром виде выглядит вот так:
GET /PinegrowLinux64.2.2.zip HTTP/1.1
Host: pinegrow.s3.amazonaws.com
User-Agent: Go-http-client/1.1
Range: bytes=34010904-42513630
Первая моя реализация работала крайне медленно. Все дело в том, что на каждую небольшую порцию данных готовился свой запрос, т.е. если необходимо было скачать сегмент от 1 до 2 мегобайтов блоками по 100 кб. это означало, что последовантельно было выполненно 10 запросов для каждого блока. Я достаточно быстро понял, что что-то не так.
В программе wireshark я проверил как выполняется закачка другой программой — download master. Правильная схема работы была другая. Если нам нужно скачать 10 сегментов, то сначала готовилось 10 http-запросов на каждый сегмент, а деление на блоки было реализовано последовательным считыванием из блока body в рамках одного http-response.
func (pd *PartialDownloader) BeforeDownload() error {
//create new req
r, err := http.NewRequest("GET", pd.url, nil)
if err != nil {
return err
}
r.Header.Add("Range", "bytes="+strconv.FormatInt(pd.dp.Pos, 10)+"-"+strconv.FormatInt(pd.dp.To, 10))
f,_:=iotools.CreateSafeFile("test")
r.Write(f)
f.Close()
resp, err := pd.client.Do(r)
if err != nil {
log.Printf("error: error download part file%v \n", err)
return err
}
//check response
if resp.StatusCode != 206 {
log.Printf("error: file not found or moved status:", resp.StatusCode)
return errors.New("error: file not found or moved")
}
pd.req = *resp
return nil
}
….
func (pd *PartialDownloader) DownloadSergment() (bool, error) {
//write flush data to disk
buffer := make([]byte, FlushDiskSize, FlushDiskSize)
count, err := pd.req.Body.Read(buffer)
if (err != nil) && (err.Error() != "EOF") {
pd.req.Body.Close()
pd.file.Sync()
return true, err
}
//log.Printf("returned from server %v bytes", count)
if pd.dp.Pos+int64(count) > pd.dp.To {
count = int(pd.dp.To - pd.dp.Pos)
log.Printf("warning: server return to much for me i give only %v bytes", count)
}
realc, err := pd.file.WriteAt(buffer[:count], pd.dp.Pos)
if err != nil {
pd.file.Sync()
pd.req.Body.Close()
return true, err
}
pd.dp.Pos = pd.dp.Pos + int64(realc)
pd.messureSpeed(realc)
//log.Printf("writed %v pos %v to %v", realc, pd.dp.Pos, pd.dp.To)
if pd.dp.Pos == pd.dp.To {
//ok download part complete normal
pd.file.Sync()
pd.req.Body.Close()
pd.dp.Speed = 0
log.Printf("info: download complete normal")
return true, nil
}
//not full download next segment
return false, nil
}
Обернув класс загрузчика в интерефейс DiscretWork из предыдущей части заметки мы можем попробовать протестировать его работу
func TestDownload(t *testing.T) {
dl, err := httpclient.CreateDownloader("http://pinegrow.s3.amazonaws.com/PinegrowLinux64.2.2.zip", "PinegrowLinux64.2.2.zip", 7)
if err != nil {
t.Error("failed: can't create downloader")
}
errs := dl.StartAll()
if len(errs)>0 {
t.Error("failed: can't start downloader")
}
…..wait for finish download
}
Интерфейс
Уже достаточно долгое время все свои сервисы на go я делаю по одной схеме. Как правило, web-интерейс через который пользователь взаимодействует с json-сервисом посредствам http-запросов. Такая схема работы дает ряд преимуществ перед традиционными графическими интерейсами перечисленными ниже.
- Возможность в автоматическом режиме добавлять новые закачки;
- Нет привязки к определенной операционной системе, достаточно только браузера.
Обновление интерфейса осуществляется каждые 500 милисекунд. В качестве источника данных для таблицы закачек используется псевдо-файл localhost/progress.json. Если его открыть в браузере, откроются динамически обновляемые json-данные. В качестве компонента таблицы используется jgrid. Благодаря его простоте код занимает совсем немного места.
function UpdateTable() {
$("#jqGrid")
.jqGrid({
url: 'http://localhost:9981/progress.json',
mtype: "GET",
ajaxSubgridOptions: {
async: false
},
styleUI: 'Bootstrap',
datatype: "json",
colModel: [{
label: '#',
name: 'Id',
key: true,
width: 5
},
…..
{
label: 'Speed',
name: 'Speed',
width: 15,
formatter: FormatByte
}, {
label: 'Progress',
name: 'Progress',
formatter: FormatProgressBar
}],
viewrecords: true,
rowNum: 20,
pager: "#jqGridPager"
});
}
Завершение сервиса и сохранение настроек
Есть ещё интересная особенность сервиса о которой я бы хотел рассказать. Это то, как завершается веб-сервис. Дело в том, что в момент запуска http-сервиса, программа зависает на функции start и виснет пока мы не завершим приложение. Но в Go есть возможность подписаться на сигналы посылаемые операционной системой. Таким образом мы можем перехватить момент, когда наш процесс завершается, даже если мы это делаем через комманду kill и выполняем какие-либо завершающие действия. К примеру, это сохранение настроек и текущий прогресс закачек.
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGTERM)
go func() {
<-c
func() {
gdownsrv.StopAllTask()
log.Println("info: save setting ", gdownsrv.SaveSettings(getSetPath()))
}()
os.Exit(1)
}()
Существуют много расширений стандартной реализации http-сервиса go позволяющих выполнить какие-либо действия после завершения сервиса. На мой взгляд описанный выше способ наиболее прост и надежен, данный способ работает даже если мы убивает сервис.
В принцие это наверное все, что я хотел донести для читателей.
Не знаю на сколько актуальным получился менеджер закачек для других, но дистрибутивы и образы виртуальных машин я уже качаю при помощи своего менеджера закачек.
Но контрольные суммы периодически всеже проверяю)
Скачать релиз под Mac, Windows, Linux
http://loafter.github.io/godownloader/