[Из песочницы] Автономный ftp-client с докачкой файлов

Хочу поделиться своим опытом в разработке автономного ftp клиента.

В наличии имеется ftp-сервер, на котором периодически появляются данные в виде графических изображений и текстовых файлов, их размер варьируется от десятков килобайт до пары гигабайт. Доступ в интернет может быть через провод, а может быть через GSM-свисток или вообще по спутнику, то есть стабильным и нестабильным соответственно. Во втором случае резко повышается вероятность потери соединения из-за погодных условий, географического положения и т.п.

Итак, требования к клиенту следующие:

  1. Опрос ftp-сервера на наличие новых файлов и их последующая загрузка.
  2. В случае внезапной остановки загрузки (будь то обрыв соединения, или упадёт система, на которой стоит мой ftp-клиент), закачка должна продолжиться при первой возможности.
  3. Ограничение скорости загрузки (это связано со стоимостью трафика по GSM).


Если интересен мой способ решения задачи, прошу под кат!
Для удобства можно разбить всю статью на ключевые этапы работы клиента, с примерами кода и более подробным описанием тонкостей работы.

Постановка задачи


Подумав, я решил написать клиент, который работает по следующей схеме:

  1. Стучимся на сервер, получаем список файлов.
  2. Смотрим в свою историю загрузок, если файл отсутствует в истории, то добавляем файл в очередь загрузки.
  3. Если файл по каким-то причинам не удалось скачать, он отправляется в конец очереди загрузки.
  4. Успешно загруженный файл добавляется в историю.


И некоторые особенности:

  • История хранится runtime и дублируется в xml-файл, откуда можно будет восстановить историю
  • Клиент поддерживает загрузку нескольких файлов одновременно в разных потоках


Периодический опрос сервера и получение списка файлов


Решение периодического опроса сервера приходит на ум почти сразу — запустить таймер, в котором будет заключён метод получения списка файлов. Однако, сервер имеет немного своеобразную структуру каталогов. Если говорить коротко, то на сервере есть две папки — notify и files. В папке files хранятся сами данные, которые требуется скачать, и все они имеют уникальные имена по типу FILE_ID_xxx, где х — любая цифра. Папка notify содержит xml-файлы с описанием файлов из папки files, в т. ч. их настоящее имя, дату размещения на сервере и размер.

Прочитав все xml из папки notify, я формирую коллекцию из нехитрого FileItem:

public class FileItem
    {
        [XmlAttribute(AttributeName = "RemoteUri")]
        public string RemoteUri;
        [XmlAttribute(AttributeName = "SavePath")]
        public string SavePath;
        [XmlAttribute(AttributeName = "Date")]
        public string Date;
        [XmlAttribute(AttributeName = "RefId")]
        public string RefId;
        [XmlAttribute(AttributeName = "Name")]
        public string Name;
        [XmlAttribute(AttributeName = "Extention")]
        public string Extention;
        [XmlAttribute(AttributeName = "Size")]
        public long Size;
    }


А далее, пробегаясь по коллекции, проверяем, присутствует ли файл в истории загрузок, и не загружается ли он в данный момент

foreach (var df in dataFiles)
{
    if (!FileHistory.FileExists(df) && !client.AlreadyInProgress(df))
    {
        client.DownloadFile(df);
    }
}


Вот и всё. Опрос сервера и поиск новых файлов закончен. О том, кто такие FileHistory и client — напишу далее.

Загрузка файлов в несколько потоков


»client» в коде выше — это экземпляр класса FTPClient, занимающегося только загрузкой файлов с сервера. И по факту FTPClient — моя обертка FtpWebRequest.

FTPClient имеет в себе потокобезопасную очередь, называемую «очередью загрузки»:

private ConcurrentQueue downloadQueue;


Итак, что происходит при вызове метода DownloadFile:

public void DownloadFile(FileItem file)
{
    downloadQueue.Enqueue(file);
    StartDownloadTask();
}


Всё довольно просто — файл добавляется в очередь на загрузку, и вслед за этим вызывается метод, который создает задачу по загрузке файла, используя TPL. Вот как это выглядит:

private void StartDownloadTask()
        {
            if (currentActiveDownloads <= Settings.MaximumDownloadThreads)
            {
                FileItem file;
                if (!downloadQueue.IsEmpty && downloadQueue.TryDequeue(out file))
                {
                    Task t;
                    if (File.Exists(file.SavePath))
                    {
                        FileInfo info = new FileInfo(file.SavePath);
                        var currentSize = info.Length;
                        t = new Task(() => DownloadTask(file, currentSize));
                    }
                    else
                    {
                        t = new Task(() => DownloadTask(file, 0));
                    }

                    t.ContinueWith(OnTaskComplete);
                    t.Start();

                    Interlocked.Increment(ref currentActiveDownloads);
                    lock (inProgressLock)
                    {
                        inProgress.Add(file);
                    }
                }
            }


Говоря русским языком, сначала проверим сколько уже работает тасков по загрузке файла, и если есть возможность пропихнуть ещё один. Затем пытаемся получить FileItem из очереди загрузки, если очередь не пуста. Потом определяем, присутствует ли файл уже локально, или нет. А локально присутствовать файл может в том случае, если загрузка неожиданно прервалась. Всё, что мы успели скачать, остаётся на диске. Так вот, в этом случае мы просто начнём загрузку с того места, на котором остановились.

Метод OnTaskComplete, который вызовется после завершения DownloadTask:

private void OnTaskComplete(Task t)
        {
            Interlocked.Decrement(ref currentActiveDownloads);
            StartDownloadTask();
        }


То есть уменьшим счетчик активных загрузок, и попробуем начать новый таск загрузки. То есть, получается что новый таск на загрузку будет создаваться при добавлении нового файла в очередь загрузки и при завершении текущего таска на загрузку.

Теперь метод, занимающийся непосредственно загрузкой файла с сервера:

private void DownloadTask(FileItem file, long offset)
        {
            // Перед началом загрузки поставим поток на паузу. В случае, если файл не доступен по какой-то причине, то мы не будем спамить на сервер в попытках достучаться до него
            Thread.Sleep(10 * 1000);

            Log.Info(string.Format("Загружается файл {0}", file.Name));

            try
            {
                if (offset == file.Size)
                {
                    Log.Info(string.Format("Файл {0} уже полностью скачан.", file.Name));
                    FileHistory.AddToDownloadHistory(file);
                    return;
                }

                using (var readStream = GetResponseStreamFromServer(file.RemoteUri, WebRequestMethods.Ftp.DownloadFile, offset))
                {
                    using (var writeStream = new FileStream(file.SavePath, FileMode.Append, FileAccess.Write))
                    {
                        var bufferSize = 1024;
                        var buffer = new byte[bufferSize];

                        int second = 1000;
                        int timePassed = 0;
                        var stopWatch = new Stopwatch();

                        var readCount = readStream.Read(buffer, 0, bufferSize);
                        int downloadedBytes = readCount;
                        
                        while(readCount > 0)
                        {
                            // Считаем данные потока и засечём сколько на это ушло времени
                            stopWatch.Start();
                            writeStream.Write(buffer, 0, readCount);
                            readCount = readStream.Read(buffer, 0, bufferSize);
                            stopWatch.Stop();

                            // Если скорость ограничена (0 считается за отсутствие ограничения)
                            if (Settings.MaximumDownloadSpeed > 0)
                            {
                                var downloadLimit = (Settings.MaximumDownloadSpeed * 1024 / 8) / currentActiveDownloads;

                                downloadedBytes += readCount;
                                timePassed += (int)stopWatch.ElapsedMilliseconds;

                                if (downloadedBytes >= downloadLimit)
                                {
                                    var pause = second - timePassed;
                                    if (pause > 0)
                                        Thread.Sleep(pause);
                                    timePassed = 0;
                                    downloadedBytes = 0;
                                    stopWatch.Reset();
                                }

                                if (timePassed > second)
                                {
                                    stopWatch.Reset();
                                    timePassed = 0;
                                    downloadedBytes = 0;
                                }
                            }
                        }
                    }
                }

                lock (inProgressLock)
                {
                    inProgress.Remove(file);
                }

                FileHistory.AddToDownloadHistory(file);
                Log.Info(string.Format("Файл загружен - {0}", file.Name));
                Interlocked.Add(ref currentLoadedSize, -file.Size);
            }
            catch (WebException e)
            {
                Log.Error(e);
                downloadQueue.Enqueue(file);
            }
            catch (Exception e)
            {
                Log.Error(e);
            }

        }


И метод, который формирует запрос к серверу и возвращает ответ:

private Stream GetResponseStreamFromServer(string uri, string method, long offset)
        {
            var request = (FtpWebRequest)WebRequest.Create(uri);

            request.UseBinary = true;
            request.Credentials = new NetworkCredential(Settings.Login, Settings.Password);
            request.Method = method;
            request.Proxy = null;
            request.KeepAlive = false;
            request.ContentOffset = offset;

            var response = request.GetResponse();

            return response.GetResponseStream();

        }


То есть, чтобы начать чтение потока не с начала, используется строка при формировании запроса: ё

request.ContentOffset = offset;


А ограничение скорости работает следующим образом: первым делом расчитаем downloadLimit, сколько байт может загрузить текущий поток. Учитывается общее ограничение скорости и количество активных потоков загрузки. Затем читаем поток 1024 байта. Засекли, сколько времени это заняло (timePassed). Общее количество считанных байт записывается в downloadedBytes.

При превышении лимита ставим поток на паузу на оставшееся время до конца секунды:

var pause = second - timePassed;
if (pause > 0)
    Thread.Sleep(pause);


По истечению секунды счётчики обнуляются.

И в случае WebExeption файл снова добавится в очередь загрузки. А в историю файл попадёт только после успешного завершения.

История загрузок


Хранение истории загрузок в файле пригодится на тот случай, если приложение вдруг перезапустится, и история, хранимая runtime будет потеряна.

Внутри класс FileHistory имеет коллекцию, хранящую в себе FileItem, которые мы уже успешно скачали:

private static List downloadHistory; 


Добавление файла происходит очень просто — мы добавляем файл в коллекцию и сразу записываем изменения в xml:

public static void AddToDownloadHistory(FileItem file)
        {
            lock (historyLock)
            {
                XmlSerializer serializer = new XmlSerializer(typeof(List));

                using (var writer = GetXml())
                {
                    downloadHistory.Add(file);
                    serializer.Serialize(writer, downloadHistory);
                }
            }
        }


И вот что происходит, когда мы хотим проверить наличие файла в истории:

public static bool FileExists(FileItem file)
        {
            lock (historyLock)
            {
                if (downloadHistory.Count == 0)
                {
                    if (!TryRestoreHistoryFromXml())
                    {
                        return false;
                    }
                }

                return downloadHistory.Any(f => f.RefId == file.RefId);
            }
        }


Поясню — вот вызывается метод проверки. И записей в нашей коллекции ноль. Вероятно, приложение падало, и история утерялась. На этот случай попытаемся восстановить историю из xml. Если это не удается (файл отсутствует или повреждён) — считаем, что мы этот файл ещё не загружали.

Завершение


Я расчитываю, что эта статья поможет тем, кому тоже придётся писать свой ftp-клиент в первый раз, как и мне. Не претендую на то, что решение идеально. И это мой первый опыт написания статей на хабр, поэтому я открыт для критики и комментариев.

© Habrahabr.ru