Разработка менеджера закачек на GO

Менеджер многопоточных закачек на GO.
http://loafter.github.io/godownloader/
32c876eb51264ca084e258a2bfb08e76.jpeg

Вступление


Давным-давно, в году этак 1998, для выхода в интернет я использовал модем на работе у отца. Он его включал вечером после работы и я мог наслаждаться просторами сети интернет на скорости аж 31.2 кбит/c. В то время не было истеричных блогеров, страницы не весили по мегабайту, а в новостных сайтах говорили только правду. Естественно основной интерес представляли ресурсы. Картинки, программы, всякие дополнения к играм, вроде машинок. Как сейчас помню качать через IE было сущим адом. Скачать файл весом больше 500 кб было просто невозможно, древний осел был намного упрямей.
В то время было очень много всяких менеджеров закачек вроде Getright, Go!zilla, Download Accelerator и, конечно, FlashGet. В то время 90% процентов из них были перегружено рекламным говном, лучшим был FlashGet. Он умел бить на части скачиваемый файл и работал шустро. Как сейчас помню версия 1.7 последняя была. Эту версию я в те времена и использовал.
Прошло 15 лет и мне понадобилось скачать большой объем данных через vpn из-за океана.
И что же изменилось за 15 лет?
Да ничего. По прежнему существуют все те же менеджеры с минимальными изменениями. Даже flashget оставил версию 1.7 вместе с новомодной 3.хх.
После того как wxfast download не сумел скачать 50 гигобайтный файл, я решил попробовать написать свой менеджер закачек, который включал бы в себя множество многопоточных заданий с возможностью контроля степени выполнения, остановки в любой момент, а также сохранения состояния загрузок между запусками приложения. Все это отличный вызов для языка GO.

Обертка


С чего начать? Первое что нам необходимо это уметь останавливать закачку и получать информацию о ходе скачивания в любой момент времени. В GO есть легковесные потоки, которые можно использовать у нас в программе. То есть как минимум один поток будет у нас для скачивания, другой для управления процессом (остановка, старт закачки, получение информации о ходе выполнения). Если с процессом получения сведений о загруженных данных проблем не возникает (мы можем получать их по ссылке сквозь потоки), то с процессом останоки закачки все немного сложнее, мы не можем остановить или убить другую goroutine. Но мы можем послать ей сигнал на выход из потока. Собственно так мы и сделаем. Реализуем простую обертку, которая бы позволила создавать произвольные дискретные работы с возможностью паузы и получения информации о состоянии работы.
Для того чтобы обернуть какую-либо структуру необходимо чтобы она подерживала следующий интерфейс:

type DiscretWork interface {
  DoWork() (bool, error)
  GetProgress() interface{}
  BeforeRun() error
  AfterStop() error
}

Сама обертка:

func (mw *MonitoredWorker) wgoroute() {
  log.Println("info: work start", mw.GetId())
  mw.wgrun.Add(1)
  defer func() {
     log.Print("info: release work guid ", mw.GetId())
     mw.wgrun.Done()
  }()

  for {
     select {
     case newState := <-mw.chsig:
        if newState == Stopped {
           mw.state = newState
           log.Println("info: work stopped")
           return
        }
     default:
        {
           isdone, err := mw.Itw.DoWork()
           if err != nil {
              log.Println("error: guid", mw.guid, " work failed", err)
              mw.state = Failed
              return
           }
           if isdone {
              mw.state = Completed
              log.Println("info: work done")
              return
           }
        }

     }
  }
}

func (mw *MonitoredWorker) Start() error {
  mw.lc.Lock()
  defer mw.lc.Unlock()
  if mw.state == Completed {
     return errors.New("error: try run completed job")
  }
  if mw.state == Running {
     return errors.New("error: try run runing job")
  }
  if err := mw.Itw.BeforeRun(); err != nil {
     mw.state = Failed
     return err
  }
  mw.chsig = make(chan int, 1)
  mw.state = Running
  go mw.wgoroute()
  return nil
}



После запуска потока, wgoroute(), в цикле функция пошагово выполняет каждую итерацию вызовом метода DoWork(), В случае, если во время выполнения работы произошла ошибка, функция выходит из цикла и завершает поток. Также в цикле осуществляется выборка из канала.

select {
     case newState := <-mw.chsig:
        if newState == Stopped {
           mw.state = newState
           log.Println("info: work stopped")
           return
        }


Если поступило сообщение Stopped, алгоритм выходит из потока и устанавливает соотвествующее состояние.
Воспользуемся встроенными в язык средствами тестирования для проверки работы обертки:

package dtest

import (
  "errors"
  "fmt"
  "godownloader/monitor"
  "log"
  "math/rand"
  "testing"
  "time"
)

type TestWorkPool struct {
  From, id, To int32
}

func (tw TestWorkPool) GetProgress() interface{} {
  return tw.From

}

func (tw *TestWorkPool) BeforeRun() error {
  log.Println("info: exec before run")
  return nil
}
func (tw *TestWorkPool) AfterStop() error {
  log.Println("info: after stop")
  return nil
}

func (tw *TestWorkPool) DoWork() (bool, error) {
  time.Sleep(time.Millisecond * 300)
  tw.From += 1
  log.Print(tw.From)
  if tw.From == tw.To {
     fmt.Println("done")
     return true, nil
  }
  if tw.From > tw.To {
     return false, errors.New("tw.From > tw.To")
  }
  return false, nil
}
func TestWorkerPool(t *testing.T) {
  wp := monitor.WorkerPool{}
  for i := 0; i < 20; i++ {
     mw := &monitor.MonitoredWorker{Itw: &TestWorkPool{From: 0, To: 20, id: rand.Int31()}}
     wp.AppendWork(mw)
  }
  wp.StartAll()
  time.Sleep(time.Second)
  log.Println("------------------Work Started------------------")
  log.Println(wp.GetAllProgress())
  log.Println("------------------Get All Progress--------------")
  time.Sleep(time.Second)
  wp.StopAll()
  log.Println("------------------Work Stop-------------------")

  time.Sleep(time.Second)
  wp.StartAll()
  time.Sleep(time.Second * 5)
  wp.StopAll()
  wp.StartAll()

  wp.StopAll()
}


Загрузка данных


После того как мы создали рабочую обертку для задач, мы можем приступить к основной функции — загрузки данных по http. Наверное основная проблема протокола http — это низкая скорость при загрузки данных в один поток. Именно поэтому, когда интернет был медленный, появилось столько менеджеров закачек, которые умели разбивать загружаемый файл на фрагменты и загружать их в несколько http-соединений, за счет этого достигался выйгрыш в скорости. Естественно наш менеджер закачек не исключение, он также должен уметь забирать файл в несколько потоков. Для нормальной работы данной схемы необходимо чтобы сервер поддерживал докачку.
Со стороны клиента должен быть сформирован запрос у которого есть в заголовке поле Range. Весь запрос в сыром виде выглядит вот так:

GET /PinegrowLinux64.2.2.zip HTTP/1.1
Host: pinegrow.s3.amazonaws.com
User-Agent: Go-http-client/1.1
Range: bytes=34010904-42513630


Первая моя реализация работала крайне медленно. Все дело в том, что на каждую небольшую порцию данных готовился свой запрос, т.е. если необходимо было скачать сегмент от 1 до 2 мегобайтов блоками по 100 кб. это означало, что последовантельно было выполненно 10 запросов для каждого блока. Я достаточно быстро понял, что что-то не так.
В программе wireshark я проверил как выполняется закачка другой программой — download master. Правильная схема работы была другая. Если нам нужно скачать 10 сегментов, то сначала готовилось 10 http-запросов на каждый сегмент, а деление на блоки было реализовано последовательным считыванием из блока body в рамках одного http-response.

func (pd *PartialDownloader) BeforeDownload() error {
  //create new req
  r, err := http.NewRequest("GET", pd.url, nil)
  if err != nil {
     return err
  }

  r.Header.Add("Range", "bytes="+strconv.FormatInt(pd.dp.Pos, 10)+"-"+strconv.FormatInt(pd.dp.To, 10))
  f,_:=iotools.CreateSafeFile("test")
  r.Write(f)
  f.Close()
  resp, err := pd.client.Do(r)
  if err != nil {
     log.Printf("error: error download part file%v \n", err)
     return err
  }
  //check response
  if resp.StatusCode != 206 {
     log.Printf("error: file not found or moved status:", resp.StatusCode)
     return errors.New("error: file not found or moved")
  }
  pd.req = *resp
  return nil
}
….
func (pd *PartialDownloader) DownloadSergment() (bool, error) {
  //write flush data to disk
  buffer := make([]byte, FlushDiskSize, FlushDiskSize)

  count, err := pd.req.Body.Read(buffer)
  if (err != nil) && (err.Error() != "EOF") {
     pd.req.Body.Close()
     pd.file.Sync()
     return true, err
  }
  //log.Printf("returned from server %v bytes", count)
  if pd.dp.Pos+int64(count) > pd.dp.To {
     count = int(pd.dp.To - pd.dp.Pos)
     log.Printf("warning: server return to much for me i give only %v bytes", count)
  }

  realc, err := pd.file.WriteAt(buffer[:count], pd.dp.Pos)
  if err != nil {
     pd.file.Sync()
     pd.req.Body.Close()
     return true, err
  }
  pd.dp.Pos = pd.dp.Pos + int64(realc)
  pd.messureSpeed(realc)
  //log.Printf("writed %v pos %v to %v", realc, pd.dp.Pos, pd.dp.To)
  if pd.dp.Pos == pd.dp.To {
     //ok download part complete normal
     pd.file.Sync()
     pd.req.Body.Close()
     pd.dp.Speed = 0
     log.Printf("info: download complete normal")
     return true, nil
  }
  //not full download next segment
  return false, nil
}



Обернув класс загрузчика в интерефейс DiscretWork из предыдущей части заметки мы можем попробовать протестировать его работу

func TestDownload(t *testing.T) {
  dl, err := httpclient.CreateDownloader("http://pinegrow.s3.amazonaws.com/PinegrowLinux64.2.2.zip", "PinegrowLinux64.2.2.zip", 7)
  if err != nil {
     t.Error("failed: can't create downloader")
  }
  errs := dl.StartAll()
  if len(errs)>0 {
     t.Error("failed: can't start downloader")
  }
…..wait for finish download
}


Интерфейс


Уже достаточно долгое время все свои сервисы на go я делаю по одной схеме. Как правило, web-интерейс через который пользователь взаимодействует с json-сервисом посредствам http-запросов. Такая схема работы дает ряд преимуществ перед традиционными графическими интерейсами перечисленными ниже.

  • Возможность в автоматическом режиме добавлять новые закачки;
  • Нет привязки к определенной операционной системе, достаточно только браузера.

Обновление интерфейса осуществляется каждые 500 милисекунд. В качестве источника данных для таблицы закачек используется псевдо-файл localhost/progress.json. Если его открыть в браузере, откроются динамически обновляемые json-данные. В качестве компонента таблицы используется jgrid. Благодаря его простоте код занимает совсем немного места.
image
image

function UpdateTable() {
  $("#jqGrid")
     .jqGrid({
        url: 'http://localhost:9981/progress.json',
        mtype: "GET",
        ajaxSubgridOptions: {
           async: false
        },
        styleUI: 'Bootstrap',
        datatype: "json",
        colModel: [{
           label: '#',
           name: 'Id',
           key: true,
           width: 5
        },
….. 
        {
           label: 'Speed',
           name: 'Speed',
           width: 15,
           formatter: FormatByte
        }, {
           label: 'Progress',
           name: 'Progress',
           formatter: FormatProgressBar
        }],
        viewrecords: true,
        rowNum: 20,
        pager: "#jqGridPager"
     });
}



Завершение сервиса и сохранение настроек


Есть ещё интересная особенность сервиса о которой я бы хотел рассказать. Это то, как завершается веб-сервис. Дело в том, что в момент запуска http-сервиса, программа зависает на функции start и виснет пока мы не завершим приложение. Но в Go есть возможность подписаться на сигналы посылаемые операционной системой. Таким образом мы можем перехватить момент, когда наш процесс завершается, даже если мы это делаем через комманду kill и выполняем какие-либо завершающие действия. К примеру, это сохранение настроек и текущий прогресс закачек.

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGTERM)
go func() {
  <-c
  func() {
     gdownsrv.StopAllTask()
     log.Println("info: save setting ", gdownsrv.SaveSettings(getSetPath()))
  }()
  os.Exit(1)
}()


Существуют много расширений стандартной реализации http-сервиса go позволяющих выполнить какие-либо действия после завершения сервиса. На мой взгляд описанный выше способ наиболее прост и надежен, данный способ работает даже если мы убивает сервис.
В принцие это наверное все, что я хотел донести для читателей.
Не знаю на сколько актуальным получился менеджер закачек для других, но дистрибутивы и образы виртуальных машин я уже качаю при помощи своего менеджера закачек.
Но контрольные суммы периодически всеже проверяю)
Скачать релиз под Mac, Windows, Linux
http://loafter.github.io/godownloader/

© Habrahabr.ru