[Из песочницы] Прямой доступ к диску из python

image

Расскажу я вам сегодня о том, как пытался я добраться из питона до интерфейса жесткого диска, и что из этого получилось.

Появляется у меня периодически необходимость тестирования большого количества жестких дисков. Обычно для этого используется досовая Victoria загружающаяся по сети. Она тестирует диски по одному, что не очень удобно. К тому же последнее время пошли платы не имеющие режима IDE, что дополнительно усложняет задачу. По началу у меня возникла идея взять готовый софт под линукс с открытыми исходниками и добавить ему возможность параллельного тестирования нескольких дисков. После беглого поиска выяснилось удручающее состояние этой области в линуксе. Из софта, ведущего при тестировании статистику по времени доступа к секторам и типам ошибок нашел только whdd. Попытка разобраться с кодом whdd закончилась полным провалом. Для меня, ни разу не программиста, код показался очень запутанным. К тому же большую его часть занимает совсем не работа с железом.

Поняв, что простого решения не предвидится я решил попробовать написать подобную программу самостоятельно. Понимая, что подобный проект на C я не осилю я начал изучать возможность прямой работы с дисками из python, которым я частенько пользуюсь для решения простых задач и люблю за простоту и понятность. Информации по этому вопросу в сети кот наплакал, но все же я выяснил, что существует модуль fcntl который в том числе позволяет отправлять устройству ioctl запросы. Теперь у меня появилась возможность отправлять команды диску. Но в линуксе все диски считаются scsi дисками, а для тестирования нужно передавать диску непосредственно ata команды. Оказалось существует механизм ATA Command Pass-Through, позволяющий обернуть ata команду в scsi запрос. Основную информацию о том, как это использовать удалось почерпнуть из исходных текстов проекта sg3_utils. Осталось попробовать реализовать это все на питоне.
Для того, чтобы создать в питоне структуры аналогичные структурам языка C, для последующей передачи их в ioctl, существует модуль ctypes. Отдельно стоит упомянуть количество седых волос появившихся в результате отладки странных глюков с этими структурами. Так я открыл для себя знание о выравнивании структур в C. В результате родились две структуры:

Структура для ATA Pass-Through:

class ataCmd(ctypes.Structure):
    _pack_ = 1
    _fields_ = [
        ('opcode', ctypes.c_ubyte),
        ('protocol', ctypes.c_ubyte),
        ('flags', ctypes.c_ubyte),
        ('features', ctypes.c_ushort),
        ('sector_count', ctypes.c_ushort),
        ('lba_h_low', ctypes.c_ubyte),
        ('lba_low', ctypes.c_ubyte),
        ('lba_h_mid', ctypes.c_ubyte),
        ('lba_mid', ctypes.c_ubyte),
        ('lba_h_high', ctypes.c_ubyte),
        ('lba_high', ctypes.c_ubyte),
        ('device', ctypes.c_ubyte),
        ('command', ctypes.c_ubyte),
        ('control', ctypes.c_ubyte)]


И структура для ioctl:

class sgioHdr(ctypes.Structure):
    _pack_ = 1
    _fields_ = [
        ('interface_id', ctypes.c_int),      # [i] 'S' for SCSI generic (required)
        ('dxfer_direction', ctypes.c_int),   # [i] data transfer direction
        ('cmd_len', ctypes.c_ubyte),         # [i] SCSI command length ( <= 16 bytes)
        ('mx_sb_len', ctypes.c_ubyte),       # [i] max length to write to sbp
        ('iovec_count', ctypes.c_ushort),    # [i] 0 implies no scatter gather
        ('dxfer_len', ctypes.c_uint),        # [i] byte count of data transfer
        ('dxferp', ctypes.c_void_p),         # [i], [*io] points to data transfer memory
        ('cmdp', ctypes.c_void_p),           # [i], [*i] points to command to perform
        ('sbp', ctypes.c_void_p),            # [i], [*o] points to sense_buffer memory
        ('timeout', ctypes.c_uint),          # [i] MAX_UINT->no timeout (unit: millisec)
        ('flags', ctypes.c_uint),            # [i] 0 -> default, see SG_FLAG...
        ('pack_id', ctypes.c_int),           # [i->o] unused internally (normally)
        ('usr_ptr', ctypes.c_void_p),        # [i->o] unused internally
        ('status', ctypes.c_ubyte),          # [o] scsi status
        ('masked_status', ctypes.c_ubyte),   # [o] shifted, masked scsi status
        ('msg_status', ctypes.c_ubyte),      # [o] messaging level data (optional)
        ('sb_len_wr', ctypes.c_ubyte),       # [o] byte count actually written to sbp
        ('host_status', ctypes.c_ushort),    # [o] errors from host adapter
        ('driver_status', ctypes.c_ushort),  # [o] errors from software driver
        ('resid', ctypes.c_int),             # [o] dxfer_len - actual_transferred
        ('duration', ctypes.c_uint),         # [o] time taken by cmd (unit: millisec)
        ('info', ctypes.c_uint)]             # [o] auxiliary information


Поскольку заполнение этих структур требуется перед каждой дисковой операцией и занимает много места, эта операция вынесена в отдельную функцию. В многобайтовых значениях нужно поменять порядок байтов.

def prepareSgio(cmd, feature, count, lba, direction, sense, buf):
    if direction == SG_DXFER_FROM_DEV:
        buf_len = ctypes.sizeof(buf)
        buf_p = ctypes.cast(buf, ctypes.c_void_p)
        prot = 4 << 1  # PIO Data-In
    elif direction == SG_DXFER_TO_DEV:
        buf_len = ctypes.sizeof(buf)
        buf_p = ctypes.cast(buf, ctypes.c_void_p)
        prot = 5 << 1  # PIO Data-Out
    else:
        buf_len = 0
        buf_p = None
        prot = 3 << 1  # Non-data

    if cmd != 0xb0:  # not SMART COMMAND
        prot = prot | 1  # + EXTEND
    sector_lba = lba.to_bytes(6, byteorder='little')

    ata_cmd = ataCmd(opcode=0x85,  # ATA PASS-THROUGH (16)
                     protocol=prot,
                     # flags field
                     # OFF_LINE = 0 (0 seconds offline)
                     # CK_COND = 1 (copy sense data in response)
                     # T_DIR = 1 (transfer from the ATA device)
                     # BYT_BLOK = 1 (length is in blocks, not bytes)
                     # T_LENGTH = 2 (transfer length in the SECTOR_COUNT field)
                     flags=0x2e,
                     features=swap16(feature),
                     sector_count=swap16(count),
                     lba_h_low=sector_lba[3], lba_low=sector_lba[0],
                     lba_h_mid=sector_lba[4], lba_mid=sector_lba[1],
                     lba_h_high=sector_lba[5], lba_high=sector_lba[2],
                     device=0,
                     command=cmd,
                     control=0)

    sgio = sgioHdr(interface_id=ASCII_S, dxfer_direction=direction,
                   cmd_len=ctypes.sizeof(ata_cmd),
                   mx_sb_len=ctypes.sizeof(sense), iovec_count=0,
                   dxfer_len=buf_len,
                   dxferp=buf_p,
                   cmdp=ctypes.addressof(ata_cmd),
                   sbp=ctypes.cast(sense, ctypes.c_void_p), timeout=1000,
                   flags=0, pack_id=0, usr_ptr=None, status=0, masked_status=0,
                   msg_status=0, sb_len_wr=0, host_status=0, driver_status=0,
                   resid=0, duration=0, info=0)

    return sgio


Эта функция принимает ata команду, параметры и буферы, а возвращает готовую структуру для ioctl запроса. Дальше все просто. Создаем буфер в котором вернутся статус выполнения команды и содержимое ata регистров статуса и ошибки. Создаем буфер для сектора, прочитанного с диска. Заполняем структуры и выполняем нашу первую ata команду.

sense = ctypes.c_buffer(64)
identify = ctypes.c_buffer(512)
sgio = prepareSgio(0xec, 0, 0, 0, SG_DXFER_FROM_DEV, sense, identify)  # IDENTIFY
with open(dev, 'r') as fd:
    if fcntl.ioctl(fd, SG_IO, ctypes.addressof(sgio)) != 0:
        return None  # fcntl failed!


В ответ получаем сектор с информацией о диске:

0000000: 5a04 ff3f 37c8 1000 0000 0000 3f00 0000  Z..?7.......?...
0000010: 0000 0000 2020 2020 2020 4b4a 3131 3142  ....      KJ111B
0000020: 3942 5647 4142 4659 0300 5fea 3800 4b4a  9BVGABFY.._.8.KJ
0000030: 4f41 3341 4145 6948 6174 6863 2069 5548  OA3AAEiHathc iUH
0000040: 3741 3232 3230 4130 414c 3333 2030 2020  7A2220A0AL33 0  
0000050: 2020 2020 2020 2020 2020 2020 2020 1080                ..
0000060: 0040 002f 0040 0002 0002 0700 ff3f 1000  .@./.@.......?..
0000070: 3f00 10fc fb00 0001 ffff ff0f 0000 0700  ?...............
0000080: 0300 7800 7800 7800 7800 0000 0000 0000  ..x.x.x.x.......
0000090: 0000 0000 0000 1f00 0617 0000 5e00 4400  ............^.D.
00000a0: fc01 2900 6b34 697d 7347 6934 41bc 6347  ..).k4i}sGi4A.cG
00000b0: 7f40 0401 0000 0000 feff 0000 0000 0800  .@..............
00000c0: ca00 f900 1027 0000 b088 e0e8 0000 0000  .....'..........
00000d0: ca00 0000 0000 875a 0050 a2cc cb22 44fc  .......Z.P..."D.
00000e0: 0000 0000 0000 0000 0000 0000 0000 1440  ...............@
00000f0: 1440 0000 0000 0000 0000 0000 0000 0000  .@..............
0000100: 0100 0b00 0000 0000 8020 f10d 20fa 0100  ......... .. ...
0000110: 0040 0404 0403 0000 0000 0502 0604 0504  .@..............
0000120: 0506 0803 0506 0504 0505 0603 0505 0000  ................
0000130: 3741 3342 0000 0a78 0000 bd5d d3a1 0080  7A3B...x...]....
0000140: 0000 0000 0000 0000 0000 0000 0000 0000  ................
0000150: 0200 0000 0000 0000 0000 0000 0000 0000  ................
0000160: 0000 0000 0000 0000 0000 0000 0000 0000  ................
0000170: 0000 0000 0000 0000 0000 0000 0000 0000  ................
0000180: 0000 0000 0000 0000 0000 0000 0000 0000  ................
0000190: 0000 0000 0000 0000 0000 0000 3d00 0000  ............=...
00001a0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00001b0: 0000 201c 0000 0000 0000 0000 1f10 2100  .. ...........!.
00001c0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00001d0: 0000 0000 0100 e003 0000 0000 0000 0000  ................
00001e0: 0000 0000 0000 0000 0000 0000 0000 0000  ................
00001f0: 0000 0000 0000 0000 0000 0000 0000 a503  ................


В нем содержится полная информация о диске, извлечем основную.

    serial = swapString(identify[20:40])
    firmware = swapString(identify[46:53])
    model = swapString(identify[54:93])
    sectors = int.from_bytes(identify[200] + identify[201] + identify[202] + identify[203] +
                             identify[204] + identify[205] + identify[206] + identify[207], byteorder='little')


В результате получаем:

модель: Hitachi HUA722020ALA330; прошивка: JKAOA3; серийный номер: JK11A1YAJE2N5V; число секторов: 3907029168.

Теперь мы умеем отправлять ata команды диску и получать от него ответы. Потихоньку результат моей работы оформился в библиотечку, содержащую реализацию основного набора ata команд, включая чтение SMART. Кому интересно можно взглянуть на нее здесь. Качество кода сильно не ругайте, я не волшебник программист я только учусь.

Теперь осталось с ее помощью написать утилиту тестирования. Чувствую меня ждет еще много открытий.

© Habrahabr.ru