Как получать музыку из ВКонтакте в 2022 году

Началось все с того, что мне захотелось написать музыкального бота для своего discord сервера.

При проектировании проекта, я решил разделить его на две части. Первая — получение музыки из ВК. Вторая — сам бот. И начать я решил с первой части.

Поиск какой-либо информации на этот счет или уже возможно готового куска кода не принес никаких результатов из-за чего очевидным решением данной проблемы было то, что придется разбираться с этим самому.

Я решил посмотреть что сейчас отдает ВКонтакте при воспроизведении записи и полез во вкладку network, вот что я там увидел:

Фото

Нас интересует index.m3u8Нас интересует index.m3u8Открыв его мы видим GET запрос на сгенерированный ВКонтакте urlОткрыв его мы видим GET запрос на сгенерированный ВКонтакте urlА ответ этого запроса представляет из себя просто HLS формат, с сегментами и их ключами декодирования если они закодированыА ответ этого запроса представляет из себя просто HLS формат, с сегментами и их ключами декодирования если они закодированы

Теперь передо мной стояла новая задача, как получить с определенного аудио нужную ссылку на m3u8 файл и уже потом думать как его разбирать и собирать в дальнейшем в цельным mp3 файл.

В ходе раздумий был найден довольно простой вариант в виде библиотеки для питона vk_api и реализация получения такой ссылки через эту библиотеку выглядит так:

from vk_api import VkApi
from vk_api.audio import VkAudio

login = "+7XXXXXXXXXX"
password = "your_password"

vk_session = VKApi(
  login=login,
  password=password,
  api_version='5.81'
)
vk_session.auth()

vk_audio = VKAudio(vk_session)

# Делаем поиск аудио по названию
# Так же можно получать аудио со страницы функцией .get_iter(owner_id)
# где owner_id это айди страницы
# или же можно получить аудио с альбома, где мы сначала получаем айди альбомов
# функцией .get_albums_iter()
# и после снова вызываем .get_iter(owner_id, album_id), где album_id полученный
# айди альбома
q = "audio name"
audio = next(vk_audio.search_iter(q=q))
url = audio['url'] # получаем ту длиннющую ссылку на m3u8 файл

Вот мы и получили ссылку на этот файл и встал вопрос, а что делать дальше. Я попробовал запихнуть эту ссылку в ffmpeg и уже было обрадовался, ведь он скачал мой заветный аудиофайл и сразу же сделал конвертацию в mp3, однако, счастье мое длилось не долго, ведь ffmpeg хоть и скачал все сегменты, самостоятельно склеив их, но зашифрованные сегменты он не расшифровал, поэтому давайте еще раз взглянем на внутренности m3u8 файла

#EXTM3U
#EXT-X-TARGETDURATION:25
#EXT-X-ALLOW-CACHE:YES
#EXT-X-PLAYLIST-TYPE:VOD
#EXT-X-KEY:METHOD=AES-128,URI="https://cs1-66v4.vkuseraudio.net/s/v1/ac/wYaompMqHNQpBIH183wK68QVW45tvaJLaznkPiqES66JM-xzffiiM4KQx5WPS0Vg99U9ggCDronPKO8bzit3v_j8fH6LymN2pngBXYTv5uaDnFiAfc2aXv848bhRJEyFVB1gaJw1VR4BS9WnSb8jIMd0haPgfvJMcWC7FW7wpFkGU14/key.pub"
#EXT-X-VERSION:3
#EXT-X-MEDIA-SEQUENCE:1
#EXTINF:2.000,
seg-1-a1.ts
#EXT-X-KEY:METHOD=NONE
#EXTINF:4.000,
seg-2-a1.ts
#EXTINF:20.000,
seg-3-a1.ts
#EXT-X-KEY:METHOD=AES-128,URI="https://cs1-66v4.vkuseraudio.net/s/v1/ac/wYaompMqHNQpBIH183wK68QVW45tvaJLaznkPiqES66JM-xzffiiM4KQx5WPS0Vg99U9ggCDronPKO8bzit3v_j8fH6LymN2pngBXYTv5uaDnFiAfc2aXv848bhRJEyFVB1gaJw1VR4BS9WnSb8jIMd0haPgfvJMcWC7FW7wpFkGU14/key.pub"
#EXTINF:20.000,
seg-4-a1.ts
#EXT-X-KEY:METHOD=NONE
#EXTINF:25.444,
seg-5-a1.ts
#EXT-X-ENDLIST

Мы видим, что перед зашифрованными сегментами в EXT-X-KEY указан метод шифровки AES-128 и ссылка на скачку ключа для расшифровки.

Для решения уже этой проблемы была найдена прекрасная библиотека m3u8 и pycryptodome:

import m3u8
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad

# Получаем этот самый m3u8 файл
m3u8_data = m3u8.load(
  url="" # Вставляем наш полученный ранее url
)
segments = m3u8.data.get("segments")

# Парсим файл в более удобный формат
segments_data = {}

for segment in segments:
  segment_uri = segment.get("uri")  
  extended_segment = {
    "segment_method": None,
    "method_uri": None
  }
  if segment.get("key").get("method") == "AES-128":
    extended_segment["segment_method"] = True
    extended_segment["method_uri"] = segment.get("key").get("uri")
    
	segments_data[segment_uri] = extended_segment
  
# И наконец качаем все сегменты с расшифровкой
uris = segments_data.keys()
for uri in uris:
  # Используем начальный url где мы подменяем index.m3u8 на наш сегмент
  audio = requests.get(url=index_url.replace("index.m3u8", uri))
  # Сохраняем .ts файл
  open(f"../m3u8_downloader/segments/{uri}", "wb").write(audio.content)
  # Если у сегмента есть метод, то расшифровываем его
  if segments_data.get(uri).get("segment_method") is not None:
    # Качаем ключ
    key_uri = segments_data.get(uri).get("method_uri")
    key = requests.get(url=key_uri)
    open(f"../m3u8_downloader/keys/key.pub", "wb").write(key.content)
    
   	# Открываем .ts файл
    f = open(f"../m3u8_downloader/segments/{uri}", "rb")
    # Читаем только первые 16 символов для расшифровки
    iv = f.read(16)
    # Читаем все остальное
    ciphered_data = f.read()
    
    # Открываем ключ
    key = open(f"../m3u8_downloader/keys/key.pub", "rb").read()
    # Расшифровываем
    cipher = AES.new(
      key,
      AES.MODE.CBC,
      iv=iv
    )
    data = unpad(cipher.decrypt(ciphered_data), AES.block_size)
    
    # перезаписываем .ts файл в уже расшифрованный и удаляем ключ из директории
    open(f"../m3u8_downloader/segments/{uri}", "wb").write(data)
    os.remove(f"../m3u8_downloader/keys/key.pub")
  

После чего собираем все сегменты в один .ts файл:

# путь где храним все сегменты и файлы внутри папки
segments_path = "segments/"
segments_file = os.listdir(segments_path)

for file in segments_file:
  f = open(f"../m3u8_downloader/{segments_path}/{file}", "rb").read()
  open("../m3u8_downloader/mp3/temp.ts", "ab").write(f)

И наконец конвертируем все в mp3 формат, для чего нам понадобиться установленный ffmpeg на ПК.

import os

os.system('ffmpeg -i "../m3u8_downloader/mp3/temp.ts" "../m3u8_downloader/mp3/temp.mp3"')
os.remove("../m3u8_downloader/mp3/temp.ts")

После чего можем спокойно удалять уже ненужные сегменты.

segments_path = "segments/"
segments_file = os.listdir(segments_path)
for file in segments_file:
	os.remove(segments_path + file)

Для меня это был довольно интересный опыт, поскольку я никогда до этого в своей жизни не работал с зашифрованными файлами и HLS протоколом, надеюсь Вам тоже было интересно читать это. Так же надеюсь я смог помочь другим людям, ведь никаких решений по скачиванию аудио с ВКонтакте на питоне в 2022 году я не нашел.

Так же выложу весь код:

Hidden text

import os
import m3u8
import requests
from vk_api import VkApi
from vk_api.audio import VkAudio
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad


class M3U8Downloader:

    def __init__(self, login: str, password: str):

        self._vk_session = VkApi(
            login=login,
            password=password,
            api_version='5.81'
        )
        self._vk_session.auth()

        self._vk_audio = VkAudio(self._vk_session)

    def download_audio(self, q: str):
        url = self._get_audio_url(q=q)
        segments = self._get_audio_segments(url=url)
        segments_data = self._parse_segments(segments=segments)
        self._download_segments(segments_data=segments_data, index_url=url)
        self._compile_audio()
        self._convert_ts_to_mp3()
        self._delete_segments()

    @staticmethod
    def _delete_segments():
        segments_path = "segments/"
        segments_file = os.listdir(segments_path)
        for file in segments_file:
            os.remove(segments_path + file)

    @staticmethod
    def _convert_ts_to_mp3():
        os.system('ffmpeg -i "../m3u8_downloader/mp3/temp.ts" "../m3u8_downloader/mp3/temp.mp3"')
        os.remove("../m3u8_downloader/mp3/temp.ts")

    @staticmethod
    def _compile_audio():
        segments_path = "segments/"
        segments_file = os.listdir(segments_path)
        for file in segments_file:
            f = open(f"../m3u8_downloader/{segments_path}/{file}", "rb").read()
            open("../m3u8_downloader/mp3/temp.ts", "ab").write(f)

    def _get_audio_url(self, q: str):
        self._vk_audio.get_albums_iter()
        audio = next(self._vk_audio.search_iter(q=q))
        url = audio['url']
        return url

    @staticmethod
    def _get_audio_segments(url: str):
        m3u8_data = m3u8.load(
            uri=url
        )
        return m3u8_data.data.get("segments")

    @staticmethod
    def _parse_segments(segments: list):
        segments_data = {}

        for segment in segments:
            segment_uri = segment.get("uri")

            extended_segment = {
                "segment_method": None,
                "method_uri": None
            }
            if segment.get("key").get("method") == "AES-128":
                extended_segment["segment_method"] = True
                extended_segment["method_uri"] = segment.get("key").get("uri")
            segments_data[segment_uri] = extended_segment
        return segments_data

    @staticmethod
    def _download_segments(segments_data: dict, index_url: str):
        uris = segments_data.keys()
        for uri in uris:
            audio = requests.get(url=index_url.replace("index.m3u8", uri))
            open(f"../m3u8_downloader/segments/{uri}", "wb").write(audio.content)
            if segments_data.get(uri).get("segment_method") is not None:
                key_uri = segments_data.get(uri).get("method_uri")
                M3U8Downloader._download_key(key_uri=key_uri)

                f = open(f"../m3u8_downloader/segments/{uri}", "rb")
                iv = f.read(16)
                ciphered_data = f.read()

                key = open(f"../m3u8_downloader/keys/key.pub", "rb").read()
                cipher = AES.new(
                    key,
                    AES.MODE_CBC,
                    iv=iv
                )
                data = unpad(cipher.decrypt(ciphered_data), AES.block_size)
                open(f"../m3u8_downloader/segments/{uri}", "wb").write(data)
                os.remove(f"../m3u8_downloader/keys/key.pub")

    @staticmethod
    def _download_key(key_uri: str):
        key = requests.get(url=key_uri)
        open(f"../m3u8_downloader/keys/key.pub", "wb").write(key.content)
       
      
login = "" # phone
password = "" # password
md = M3U8Downloader(login=login, password=password)

q = "Воллны Волны" # Запрос музыки по названию
md.download_audio()

© Habrahabr.ru