Делайте что угодно со своими файлами, ну почти
У меня есть решение для тех случаев, когда ваш процессинг файлов довольно стандартный. Решение пока сыроватое, но уже способное делать интересные и полезные вещи.
Для начала уточню, что когда я говорю стандартный процессинг файлов, я имею в виду что-то вроде получить файл от клиента, проверить размер и тип, загрузить в S3. Туда же относятся некоторые задачи, для решения которых обычно пишут скриптики, которые проходят по директориям с файлами и, к примеру, запускают какую-нибудь команду для обработки каждого файла или группы файлов.
То, о чем я хочу рассказать называется Capyfile. Я бы назвал это конвейером для обработки файлов, который можно конфигурировать под свои самые разнообразные нужды. Код можно найти на GitHub. Написано все на Go. И для начала работы с ним, нужны всего две вещи:
конфигурационный файл, определяющий что конвейер будет делать
раннер который будет обрабатывать файлы в соответствии с инструкциями из конфигурационного файла
Теперь подробности.
Конфигурационный файл конвейера
Как уже говорилось ранее, первое, что нам нужно это конфигурационный файл. С помощью этого файла вы можете структурировать конвейеры и конфигурировать операции которые к ним принадлежат. А в данный момент есть поддержка двух форматов — YAML и JSON.
Типичный файл конфигурации выглядит так.
---
version: '1.2'
name: photos
processors:
- name: archive
operations:
# read the files from the directory
- name: filesystem_input_read
params:
target:
sourceType: value
source: "/home/user/Photos/*"
# check the file type
- name: file_type_validate
params:
allowedMimeTypes:
sourceType: value
source:
- image/jpeg
- image/x-canon-cr2
- image/heic
- image/heif
# if the file type is right, upload the file to S3
- name: s3_upload
targetFiles: without_errors
params:
accessKeyId:
sourceType: env_var
source: AWS_ACCESS_KEY_ID
secretAccessKey:
sourceType: env_var
source: AWS_SECRET_ACCESS_KEY
endpoint:
sourceType: value
source: "s3.amazonaws.com"
region:
sourceType: value
source: "us-east-1"
bucket:
sourceType: env_var
source: AWS_PHOTOS_BUCKET
# if the file type is right, and it is successfully uploaded to S3,
# remove the file from the filesystem
- name: filesystem_input_remove
targetFiles: without_errors
Здесь видны три основные сущности. Самая верхняя — это сервис. Затем следуют процессоры, и конечно же операции принадлежащие каждому процессору. Таким образом каждый конвейер имеет своего рода идентификатор, по которому к нему можно обращаться. Он состоит из имени сервиса и имени процессора, между которыми разделитель (двоеточие для консоли и слэш для сервера). Вам решать, как их именовать. К примеру, обратиться к конвейеру из конфигурационного файла выше можно используя идентификатор photos:archive
.
А теперь перейдём к операциям. Каждой конвейер может состоять из множества операции, и вы можете размещать их в любом порядке который в вашем случае имеет смысл. В данный момент доступны следующие из них:
http_multipart_form_input_read
— читаем HTTP запрос какmultipart/form-data
http_octet_stream_input_read
— читаем HTTP запрос какapplication/octet-stream
filesystem_input_read
— читаем из локальной файловой системыfilesystem_input_write
— пишем в локальную файловую системуfilesystem_input_remove
— удаляем из локальной файловой системыfile_size_validate
— проверяем размер файлаfile_type_validate
— проверяем MIME-тип файлаfile_time_validate
— проверяем временные метки файлаexiftool_metadata_cleanup
— чистим метаданные файла (нужен exiftool)image_convert
— конвертируем изображение в другой формат (нужен libvips)s3_upload
— загружаем файл в S3command_exec
— выполняем произвольную команду
Каждая из операций принимает такие параметры:
targetFiles
— указать операции какие файлы она должна обрабатыватьwithout_errors
(по умолчанию) — операция обработает файлы без ошибок (ошибок валидации либо любых других)with_errors
— операция обработает лишь файлы с ошибкамиall
— операция обработает все файлы
cleanupPolicy
— указать операции, что делать с созданными ею файлами когда пришло время подчищать мусорkeep_files
(по умолчанию) — оставляем созданные операцией файлыremove_files
— удаляем созданные операцией файлы
maxPacketSize
— указать операции количество файлов которые она будет обрабатывать за раз
И конечно же, операция сама по себе должна быть сконфигурирована. Здесь у каждой операции свои параметры, которые она принимает через свойство params
. Список параметров можно найти в документации на GitHub. Значения для них можно получить из следующих источников:
value
— берем значение из конфигурационного файлаenv_var
— берем значение из переменной окруженияsecret
— берем значение из секретаfile
— берем значение из файлаhttp_get
— берем значение из HTTP GET параметраhttp_post
— берем значение из HTTP POST параметраhttp_header
— берем значение из HTTP заголовкаetcd
— берем значение из etcd
Это практически все, что нужно знать о конфигурационном файле для конвейера.
Запуск конвейера
Допустим, мы написали конфигурационный файл для нашего конвейера. Как его теперь запустить? В данный момент доступно три варианта запуска:
capysvr
— HTTP сервер запускающий конвейер под каждый запросcapycmd
— консольное приложение для запуска конвейераcapyworker
— воркер запускающий конвейер с определенной периодичностью
Каждый из раннеров принимает как аргумент файл конфигурации. И для каждого из них есть возможность включить конкурентный режим.
Теперь, когда мы знаем как конвейер настроить и запустить, перейдем к примерам.
Пример 1: архивируем логи
К примеру, есть куча ротированных логов. И логи которые старше месяца, нужно сжать и архивировать в S3.
Напишем конфиг:
---
version: '1.2'
name: logs
processors:
- name: archive
operations:
- name: filesystem_input_read
cleanupPolicy: keep_files
params:
target:
sourceType: env_var
source: INPUT_READ_TARGET
- name: file_time_validate
params:
maxMtime:
sourceType: env_var
source: MAX_LOG_FILE_TIME_RFC3339
- name: command_exec
cleanupPolicy: remove_files
params:
commandName:
sourceType: value
source: gzip
commandArgs:
sourceType: value
source: ["{{.AbsolutePath}}"]
outputFileDestination:
sourceType: value
source: "{{.AbsolutePath}}.gz"
- name: command_exec
params:
commandName:
sourceType: value
source: aws
commandArgs:
sourceType: value
source: [
"s3",
"cp", "{{.AbsolutePath}}",
"s3://MY_LOGS_BUCKET_HERE/{{.Filename}}",
]
- name: filesystem_input_remove
params:
removeOriginalFile:
sourceType: value
source: false
И запустим через capycmd
:
INPUT_READ_TARGET=/var/logs/rotated-logs/* \
MAX_LOG_FILE_TIME_RFC3339=$(date -d "30 days ago" -u +"%Y-%m-%dT%H:%M:%SZ") \
capycmd -f logs.pipeline.yml -c logs:archive
Вывод будет выглядеть как-то так:
Running logs:archive service processor...
[/var/logs/rotated-logs/access-2023-09-26.log] filesystem_input_read FINISHED file read finished
[/var/logs/rotated-logs/access-2023-09-28.log] filesystem_input_read FINISHED file read finished
[/var/logs/rotated-logs/access-2023-12-23.log] filesystem_input_read FINISHED file read finished
[/var/logs/rotated-logs/access-2023-12-28.log] filesystem_input_read FINISHED file read finished
[/var/logs/rotated-logs/access-2023-12-28.log] file_time_validate STARTED file time validation started
[/var/logs/rotated-logs/access-2023-12-28.log] file_time_validate FINISHED file mtime is too new
[/var/logs/rotated-logs/access-2023-09-26.log] file_time_validate STARTED file time validation started
[/var/logs/rotated-logs/access-2023-09-26.log] file_time_validate FINISHED file time is valid
[/var/logs/rotated-logs/access-2023-09-28.log] file_time_validate STARTED file time validation started
[/var/logs/rotated-logs/access-2023-09-28.log] file_time_validate FINISHED file time is valid
[/var/logs/rotated-logs/access-2023-12-23.log] file_time_validate STARTED file time validation started
[/var/logs/rotated-logs/access-2023-12-23.log] file_time_validate FINISHED file mtime is too new
[/var/logs/rotated-logs/access-2023-09-26.log] command_exec STARTED command execution has started
[/var/logs/rotated-logs/access-2023-09-26.log] command_exec FINISHED command execution has finished
[/var/logs/rotated-logs/access-2023-09-28.log] command_exec STARTED command execution has started
[/var/logs/rotated-logs/access-2023-09-28.log] command_exec FINISHED command execution has finished
[/var/logs/rotated-logs/access-2023-12-28.log] command_exec SKIPPED skipped due to "without_errors" target files policy
[/var/logs/rotated-logs/access-2023-12-23.log] command_exec SKIPPED skipped due to "without_errors" target files policy
[/var/logs/rotated-logs/access-2023-09-26.log] filesystem_input_write STARTED file write started
[/var/logs/rotated-logs/access-2023-09-28.log] filesystem_input_write STARTED file write started
[/var/logs/rotated-logs/access-2023-09-26.log] filesystem_input_write FINISHED file write finished
[/var/logs/rotated-logs/access-2023-09-28.log] filesystem_input_write FINISHED file write finished
[/var/logs/rotated-logs/access-2023-12-28.log] filesystem_input_write SKIPPED skipped due to "without_errors" target files policy
[/var/logs/rotated-logs/access-2023-12-23.log] filesystem_input_write SKIPPED skipped due to "without_errors" target files policy
[/var/logs/rotated-logs/access-2023-09-28.log] filesystem_input_remove STARTED file remove started
[/var/logs/rotated-logs/access-2023-09-28.log] filesystem_input_remove FINISHED file remove finished
[/var/logs/rotated-logs/access-2023-09-26.log] filesystem_input_remove STARTED file remove started
[/var/logs/rotated-logs/access-2023-09-26.log] filesystem_input_remove FINISHED file remove finished
[/var/logs/rotated-logs/access-2023-12-28.log] filesystem_input_remove SKIPPED skipped due to "without_errors" target files policy
[/var/logs/rotated-logs/access-2023-12-23.log] filesystem_input_remove SKIPPED skipped due to "without_errors" target files policy
...
Пример 2: загружаем документы по HTTP
Допустим, нам нужен HTTP эндпоинт принимающий лишь документы, которые не больше 10МБ, и затем загружать их в S3 хранилище.
Как обычно, пишем конфиг:
version: '1.2'
name: documents
processors:
- name: upload
operations:
- name: http_multipart_form_input_read
- name: file_size_validate
params:
maxFileSize:
sourceType: value
source: 1048576
- name: file_type_validate
params:
allowedMimeTypes:
sourceType: value
source:
- application/pdf
- application/msword
- application/vnd.openxmlformats-officedocument.wordprocessingml.document
- name: s3_upload
params:
accessKeyId:
sourceType: secret
source: aws_access_key_id
secretAccessKey:
sourceType: secret
source: aws_secret_access_key
endpoint:
sourceType: env_var
source: AWS_ENDPOINT
region:
sourceType: env_var
source: AWS_REGION
bucket:
sourceType: env_var
source: AWS_DOCUMENTS_BUCKET
И запускаем с помощью capysvr завернутого в Docker:
docker run \
--name capyfile_server \
--mount type=bind,source=./docs.pipeline.yml,target=/etc/capyfile/docs.pipeline.yml \
--env CAPYFILE_SERVICE_DEFINITION_FILE=/etc/capyfile/docs.pipeline.yml \
--env AWS_ENDPOINT=s3.amazonaws.com \
--env AWS_REGION=us-west-1 \
--env AWS_DOCUMENTS_BUCKET=my_documents_bucket \
--secret aws_access_key_id \
--secret aws_secret_access_key \
-p 8024:80 \
capyfile/capysvr:latest
Теперь обратиться к нему по HTTP можно так:
curl \
-F "file1=@$HOME/Documents/document.pdf" \
-F "file2=@$HOME/Documents/document.docx" \
-F "file3=@$HOME/Documents/very-large-document.pdf" \
-F "file4=@$HOME/Documents/program.run" \
http://localhost/documents/upload
В ответ сервер вернет JSON подобного формата:
{
"status": "PARTIAL",
"code": "PARTIAL",
"message": "successfully uploaded 2 of 4 files",
"files": [
{
"url": "https://my_documents_bucket.s3.us-west-1.amazonaws.com/abcdKDNJW_DDWse.pdf",
"filename": "abcdKDNJW_DDWse.pdf",
"originalFilename": "document.pdf",
"mime": "application/pdf",
"size": 5892728,
"status": "SUCCESS",
"code": "FILE_SUCCESSFULLY_UPLOADED",
"message": "file successfully uploaded"
},
{
"url": "https://my_documents_bucket.s3.us-west-1.amazonaws.com/abcdKDNJW_DDWsd.docx",
"filename": "abcdKDNJW_DDWsd.docx",
"originalFilename": "document.docx",
"mime": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"size": 3145728,
"status": "SUCCESS",
"code": "FILE_SUCCESSFULLY_UPLOADED",
"message": "file successfully uploaded"
}
],
"errors": [
{
"originalFilename": "very-large-document.pdf",
"status": "ERROR",
"code": "FILE_IS_TOO_BIG",
"message": "file size can not be greater than 10 MB"
},
{
"originalFilename": "program.run",
"status": "ERROR",
"code": "FILE_MIME_TYPE_IS_NOT_ALLOWED",
"message": "file MIME type \"application/x-makeself\" is not allowed"
}
],
"meta": {
"totalUploads": 4,
"successfulUploads": 2,
"failedUploads": 2
}
}
В конце
Ну в общем такое я пишу по выходным. Роботы там еще много, но уже что-то вырисовывается, чему я даже немного рад. Кому идея понравилась, загляните в репозиторий.