[Из песочницы] Автономный ftp-client с докачкой файлов
Хочу поделиться своим опытом в разработке автономного ftp клиента.
В наличии имеется ftp-сервер, на котором периодически появляются данные в виде графических изображений и текстовых файлов, их размер варьируется от десятков килобайт до пары гигабайт. Доступ в интернет может быть через провод, а может быть через GSM-свисток или вообще по спутнику, то есть стабильным и нестабильным соответственно. Во втором случае резко повышается вероятность потери соединения из-за погодных условий, географического положения и т.п.
Итак, требования к клиенту следующие:
- Опрос ftp-сервера на наличие новых файлов и их последующая загрузка.
- В случае внезапной остановки загрузки (будь то обрыв соединения, или упадёт система, на которой стоит мой ftp-клиент), закачка должна продолжиться при первой возможности.
- Ограничение скорости загрузки (это связано со стоимостью трафика по GSM).
Если интересен мой способ решения задачи, прошу под кат!
Для удобства можно разбить всю статью на ключевые этапы работы клиента, с примерами кода и более подробным описанием тонкостей работы.
Постановка задачи
Подумав, я решил написать клиент, который работает по следующей схеме:
- Стучимся на сервер, получаем список файлов.
- Смотрим в свою историю загрузок, если файл отсутствует в истории, то добавляем файл в очередь загрузки.
- Если файл по каким-то причинам не удалось скачать, он отправляется в конец очереди загрузки.
- Успешно загруженный файл добавляется в историю.
И некоторые особенности:
- История хранится runtime и дублируется в xml-файл, откуда можно будет восстановить историю
- Клиент поддерживает загрузку нескольких файлов одновременно в разных потоках
Периодический опрос сервера и получение списка файлов
Решение периодического опроса сервера приходит на ум почти сразу — запустить таймер, в котором будет заключён метод получения списка файлов. Однако, сервер имеет немного своеобразную структуру каталогов. Если говорить коротко, то на сервере есть две папки — notify и files. В папке files хранятся сами данные, которые требуется скачать, и все они имеют уникальные имена по типу FILE_ID_xxx, где х — любая цифра. Папка notify содержит xml-файлы с описанием файлов из папки files, в т. ч. их настоящее имя, дату размещения на сервере и размер.
Прочитав все xml из папки notify, я формирую коллекцию из нехитрого FileItem:
public class FileItem
{
[XmlAttribute(AttributeName = "RemoteUri")]
public string RemoteUri;
[XmlAttribute(AttributeName = "SavePath")]
public string SavePath;
[XmlAttribute(AttributeName = "Date")]
public string Date;
[XmlAttribute(AttributeName = "RefId")]
public string RefId;
[XmlAttribute(AttributeName = "Name")]
public string Name;
[XmlAttribute(AttributeName = "Extention")]
public string Extention;
[XmlAttribute(AttributeName = "Size")]
public long Size;
}
А далее, пробегаясь по коллекции, проверяем, присутствует ли файл в истории загрузок, и не загружается ли он в данный момент
foreach (var df in dataFiles)
{
if (!FileHistory.FileExists(df) && !client.AlreadyInProgress(df))
{
client.DownloadFile(df);
}
}
Вот и всё. Опрос сервера и поиск новых файлов закончен. О том, кто такие FileHistory и client — напишу далее.
Загрузка файлов в несколько потоков
»client» в коде выше — это экземпляр класса FTPClient, занимающегося только загрузкой файлов с сервера. И по факту FTPClient — моя обертка FtpWebRequest.
FTPClient имеет в себе потокобезопасную очередь, называемую «очередью загрузки»:
private ConcurrentQueue downloadQueue;
Итак, что происходит при вызове метода DownloadFile:
public void DownloadFile(FileItem file)
{
downloadQueue.Enqueue(file);
StartDownloadTask();
}
Всё довольно просто — файл добавляется в очередь на загрузку, и вслед за этим вызывается метод, который создает задачу по загрузке файла, используя TPL. Вот как это выглядит:
private void StartDownloadTask()
{
if (currentActiveDownloads <= Settings.MaximumDownloadThreads)
{
FileItem file;
if (!downloadQueue.IsEmpty && downloadQueue.TryDequeue(out file))
{
Task t;
if (File.Exists(file.SavePath))
{
FileInfo info = new FileInfo(file.SavePath);
var currentSize = info.Length;
t = new Task(() => DownloadTask(file, currentSize));
}
else
{
t = new Task(() => DownloadTask(file, 0));
}
t.ContinueWith(OnTaskComplete);
t.Start();
Interlocked.Increment(ref currentActiveDownloads);
lock (inProgressLock)
{
inProgress.Add(file);
}
}
}
Говоря русским языком, сначала проверим сколько уже работает тасков по загрузке файла, и если есть возможность пропихнуть ещё один. Затем пытаемся получить FileItem из очереди загрузки, если очередь не пуста. Потом определяем, присутствует ли файл уже локально, или нет. А локально присутствовать файл может в том случае, если загрузка неожиданно прервалась. Всё, что мы успели скачать, остаётся на диске. Так вот, в этом случае мы просто начнём загрузку с того места, на котором остановились.
Метод OnTaskComplete, который вызовется после завершения DownloadTask:
private void OnTaskComplete(Task t)
{
Interlocked.Decrement(ref currentActiveDownloads);
StartDownloadTask();
}
То есть уменьшим счетчик активных загрузок, и попробуем начать новый таск загрузки. То есть, получается что новый таск на загрузку будет создаваться при добавлении нового файла в очередь загрузки и при завершении текущего таска на загрузку.
Теперь метод, занимающийся непосредственно загрузкой файла с сервера:
private void DownloadTask(FileItem file, long offset)
{
// Перед началом загрузки поставим поток на паузу. В случае, если файл не доступен по какой-то причине, то мы не будем спамить на сервер в попытках достучаться до него
Thread.Sleep(10 * 1000);
Log.Info(string.Format("Загружается файл {0}", file.Name));
try
{
if (offset == file.Size)
{
Log.Info(string.Format("Файл {0} уже полностью скачан.", file.Name));
FileHistory.AddToDownloadHistory(file);
return;
}
using (var readStream = GetResponseStreamFromServer(file.RemoteUri, WebRequestMethods.Ftp.DownloadFile, offset))
{
using (var writeStream = new FileStream(file.SavePath, FileMode.Append, FileAccess.Write))
{
var bufferSize = 1024;
var buffer = new byte[bufferSize];
int second = 1000;
int timePassed = 0;
var stopWatch = new Stopwatch();
var readCount = readStream.Read(buffer, 0, bufferSize);
int downloadedBytes = readCount;
while(readCount > 0)
{
// Считаем данные потока и засечём сколько на это ушло времени
stopWatch.Start();
writeStream.Write(buffer, 0, readCount);
readCount = readStream.Read(buffer, 0, bufferSize);
stopWatch.Stop();
// Если скорость ограничена (0 считается за отсутствие ограничения)
if (Settings.MaximumDownloadSpeed > 0)
{
var downloadLimit = (Settings.MaximumDownloadSpeed * 1024 / 8) / currentActiveDownloads;
downloadedBytes += readCount;
timePassed += (int)stopWatch.ElapsedMilliseconds;
if (downloadedBytes >= downloadLimit)
{
var pause = second - timePassed;
if (pause > 0)
Thread.Sleep(pause);
timePassed = 0;
downloadedBytes = 0;
stopWatch.Reset();
}
if (timePassed > second)
{
stopWatch.Reset();
timePassed = 0;
downloadedBytes = 0;
}
}
}
}
}
lock (inProgressLock)
{
inProgress.Remove(file);
}
FileHistory.AddToDownloadHistory(file);
Log.Info(string.Format("Файл загружен - {0}", file.Name));
Interlocked.Add(ref currentLoadedSize, -file.Size);
}
catch (WebException e)
{
Log.Error(e);
downloadQueue.Enqueue(file);
}
catch (Exception e)
{
Log.Error(e);
}
}
И метод, который формирует запрос к серверу и возвращает ответ:
private Stream GetResponseStreamFromServer(string uri, string method, long offset)
{
var request = (FtpWebRequest)WebRequest.Create(uri);
request.UseBinary = true;
request.Credentials = new NetworkCredential(Settings.Login, Settings.Password);
request.Method = method;
request.Proxy = null;
request.KeepAlive = false;
request.ContentOffset = offset;
var response = request.GetResponse();
return response.GetResponseStream();
}
То есть, чтобы начать чтение потока не с начала, используется строка при формировании запроса: ё
request.ContentOffset = offset;
А ограничение скорости работает следующим образом: первым делом расчитаем downloadLimit, сколько байт может загрузить текущий поток. Учитывается общее ограничение скорости и количество активных потоков загрузки. Затем читаем поток 1024 байта. Засекли, сколько времени это заняло (timePassed). Общее количество считанных байт записывается в downloadedBytes.
При превышении лимита ставим поток на паузу на оставшееся время до конца секунды:
var pause = second - timePassed;
if (pause > 0)
Thread.Sleep(pause);
По истечению секунды счётчики обнуляются.
И в случае WebExeption файл снова добавится в очередь загрузки. А в историю файл попадёт только после успешного завершения.
История загрузок
Хранение истории загрузок в файле пригодится на тот случай, если приложение вдруг перезапустится, и история, хранимая runtime будет потеряна.
Внутри класс FileHistory имеет коллекцию, хранящую в себе FileItem, которые мы уже успешно скачали:
private static List downloadHistory;
Добавление файла происходит очень просто — мы добавляем файл в коллекцию и сразу записываем изменения в xml:
public static void AddToDownloadHistory(FileItem file)
{
lock (historyLock)
{
XmlSerializer serializer = new XmlSerializer(typeof(List));
using (var writer = GetXml())
{
downloadHistory.Add(file);
serializer.Serialize(writer, downloadHistory);
}
}
}
И вот что происходит, когда мы хотим проверить наличие файла в истории:
public static bool FileExists(FileItem file)
{
lock (historyLock)
{
if (downloadHistory.Count == 0)
{
if (!TryRestoreHistoryFromXml())
{
return false;
}
}
return downloadHistory.Any(f => f.RefId == file.RefId);
}
}
Поясню — вот вызывается метод проверки. И записей в нашей коллекции ноль. Вероятно, приложение падало, и история утерялась. На этот случай попытаемся восстановить историю из xml. Если это не удается (файл отсутствует или повреждён) — считаем, что мы этот файл ещё не загружали.
Завершение
Я расчитываю, что эта статья поможет тем, кому тоже придётся писать свой ftp-клиент в первый раз, как и мне. Не претендую на то, что решение идеально. И это мой первый опыт написания статей на хабр, поэтому я открыт для критики и комментариев.