[Из песочницы] Прямой доступ к диску из python
Расскажу я вам сегодня о том, как пытался я добраться из питона до интерфейса жесткого диска, и что из этого получилось.
Появляется у меня периодически необходимость тестирования большого количества жестких дисков. Обычно для этого используется досовая Victoria загружающаяся по сети. Она тестирует диски по одному, что не очень удобно. К тому же последнее время пошли платы не имеющие режима IDE, что дополнительно усложняет задачу. По началу у меня возникла идея взять готовый софт под линукс с открытыми исходниками и добавить ему возможность параллельного тестирования нескольких дисков. После беглого поиска выяснилось удручающее состояние этой области в линуксе. Из софта, ведущего при тестировании статистику по времени доступа к секторам и типам ошибок нашел только whdd. Попытка разобраться с кодом whdd закончилась полным провалом. Для меня, ни разу не программиста, код показался очень запутанным. К тому же большую его часть занимает совсем не работа с железом.
Поняв, что простого решения не предвидится я решил попробовать написать подобную программу самостоятельно. Понимая, что подобный проект на C я не осилю я начал изучать возможность прямой работы с дисками из python, которым я частенько пользуюсь для решения простых задач и люблю за простоту и понятность. Информации по этому вопросу в сети кот наплакал, но все же я выяснил, что существует модуль fcntl который в том числе позволяет отправлять устройству ioctl запросы. Теперь у меня появилась возможность отправлять команды диску. Но в линуксе все диски считаются scsi дисками, а для тестирования нужно передавать диску непосредственно ata команды. Оказалось существует механизм ATA Command Pass-Through, позволяющий обернуть ata команду в scsi запрос. Основную информацию о том, как это использовать удалось почерпнуть из исходных текстов проекта sg3_utils. Осталось попробовать реализовать это все на питоне.
Для того, чтобы создать в питоне структуры аналогичные структурам языка C, для последующей передачи их в ioctl, существует модуль ctypes. Отдельно стоит упомянуть количество седых волос появившихся в результате отладки странных глюков с этими структурами. Так я открыл для себя знание о выравнивании структур в C. В результате родились две структуры:
Структура для ATA Pass-Through:
class ataCmd(ctypes.Structure):
_pack_ = 1
_fields_ = [
('opcode', ctypes.c_ubyte),
('protocol', ctypes.c_ubyte),
('flags', ctypes.c_ubyte),
('features', ctypes.c_ushort),
('sector_count', ctypes.c_ushort),
('lba_h_low', ctypes.c_ubyte),
('lba_low', ctypes.c_ubyte),
('lba_h_mid', ctypes.c_ubyte),
('lba_mid', ctypes.c_ubyte),
('lba_h_high', ctypes.c_ubyte),
('lba_high', ctypes.c_ubyte),
('device', ctypes.c_ubyte),
('command', ctypes.c_ubyte),
('control', ctypes.c_ubyte)]
И структура для ioctl:
class sgioHdr(ctypes.Structure):
_pack_ = 1
_fields_ = [
('interface_id', ctypes.c_int), # [i] 'S' for SCSI generic (required)
('dxfer_direction', ctypes.c_int), # [i] data transfer direction
('cmd_len', ctypes.c_ubyte), # [i] SCSI command length ( <= 16 bytes)
('mx_sb_len', ctypes.c_ubyte), # [i] max length to write to sbp
('iovec_count', ctypes.c_ushort), # [i] 0 implies no scatter gather
('dxfer_len', ctypes.c_uint), # [i] byte count of data transfer
('dxferp', ctypes.c_void_p), # [i], [*io] points to data transfer memory
('cmdp', ctypes.c_void_p), # [i], [*i] points to command to perform
('sbp', ctypes.c_void_p), # [i], [*o] points to sense_buffer memory
('timeout', ctypes.c_uint), # [i] MAX_UINT->no timeout (unit: millisec)
('flags', ctypes.c_uint), # [i] 0 -> default, see SG_FLAG...
('pack_id', ctypes.c_int), # [i->o] unused internally (normally)
('usr_ptr', ctypes.c_void_p), # [i->o] unused internally
('status', ctypes.c_ubyte), # [o] scsi status
('masked_status', ctypes.c_ubyte), # [o] shifted, masked scsi status
('msg_status', ctypes.c_ubyte), # [o] messaging level data (optional)
('sb_len_wr', ctypes.c_ubyte), # [o] byte count actually written to sbp
('host_status', ctypes.c_ushort), # [o] errors from host adapter
('driver_status', ctypes.c_ushort), # [o] errors from software driver
('resid', ctypes.c_int), # [o] dxfer_len - actual_transferred
('duration', ctypes.c_uint), # [o] time taken by cmd (unit: millisec)
('info', ctypes.c_uint)] # [o] auxiliary information
Поскольку заполнение этих структур требуется перед каждой дисковой операцией и занимает много места, эта операция вынесена в отдельную функцию. В многобайтовых значениях нужно поменять порядок байтов.
def prepareSgio(cmd, feature, count, lba, direction, sense, buf):
if direction == SG_DXFER_FROM_DEV:
buf_len = ctypes.sizeof(buf)
buf_p = ctypes.cast(buf, ctypes.c_void_p)
prot = 4 << 1 # PIO Data-In
elif direction == SG_DXFER_TO_DEV:
buf_len = ctypes.sizeof(buf)
buf_p = ctypes.cast(buf, ctypes.c_void_p)
prot = 5 << 1 # PIO Data-Out
else:
buf_len = 0
buf_p = None
prot = 3 << 1 # Non-data
if cmd != 0xb0: # not SMART COMMAND
prot = prot | 1 # + EXTEND
sector_lba = lba.to_bytes(6, byteorder='little')
ata_cmd = ataCmd(opcode=0x85, # ATA PASS-THROUGH (16)
protocol=prot,
# flags field
# OFF_LINE = 0 (0 seconds offline)
# CK_COND = 1 (copy sense data in response)
# T_DIR = 1 (transfer from the ATA device)
# BYT_BLOK = 1 (length is in blocks, not bytes)
# T_LENGTH = 2 (transfer length in the SECTOR_COUNT field)
flags=0x2e,
features=swap16(feature),
sector_count=swap16(count),
lba_h_low=sector_lba[3], lba_low=sector_lba[0],
lba_h_mid=sector_lba[4], lba_mid=sector_lba[1],
lba_h_high=sector_lba[5], lba_high=sector_lba[2],
device=0,
command=cmd,
control=0)
sgio = sgioHdr(interface_id=ASCII_S, dxfer_direction=direction,
cmd_len=ctypes.sizeof(ata_cmd),
mx_sb_len=ctypes.sizeof(sense), iovec_count=0,
dxfer_len=buf_len,
dxferp=buf_p,
cmdp=ctypes.addressof(ata_cmd),
sbp=ctypes.cast(sense, ctypes.c_void_p), timeout=1000,
flags=0, pack_id=0, usr_ptr=None, status=0, masked_status=0,
msg_status=0, sb_len_wr=0, host_status=0, driver_status=0,
resid=0, duration=0, info=0)
return sgio
Эта функция принимает ata команду, параметры и буферы, а возвращает готовую структуру для ioctl запроса. Дальше все просто. Создаем буфер в котором вернутся статус выполнения команды и содержимое ata регистров статуса и ошибки. Создаем буфер для сектора, прочитанного с диска. Заполняем структуры и выполняем нашу первую ata команду.
sense = ctypes.c_buffer(64)
identify = ctypes.c_buffer(512)
sgio = prepareSgio(0xec, 0, 0, 0, SG_DXFER_FROM_DEV, sense, identify) # IDENTIFY
with open(dev, 'r') as fd:
if fcntl.ioctl(fd, SG_IO, ctypes.addressof(sgio)) != 0:
return None # fcntl failed!
В ответ получаем сектор с информацией о диске:
0000000: 5a04 ff3f 37c8 1000 0000 0000 3f00 0000 Z..?7.......?...
0000010: 0000 0000 2020 2020 2020 4b4a 3131 3142 .... KJ111B
0000020: 3942 5647 4142 4659 0300 5fea 3800 4b4a 9BVGABFY.._.8.KJ
0000030: 4f41 3341 4145 6948 6174 6863 2069 5548 OA3AAEiHathc iUH
0000040: 3741 3232 3230 4130 414c 3333 2030 2020 7A2220A0AL33 0
0000050: 2020 2020 2020 2020 2020 2020 2020 1080 ..
0000060: 0040 002f 0040 0002 0002 0700 ff3f 1000 .@./.@.......?..
0000070: 3f00 10fc fb00 0001 ffff ff0f 0000 0700 ?...............
0000080: 0300 7800 7800 7800 7800 0000 0000 0000 ..x.x.x.x.......
0000090: 0000 0000 0000 1f00 0617 0000 5e00 4400 ............^.D.
00000a0: fc01 2900 6b34 697d 7347 6934 41bc 6347 ..).k4i}sGi4A.cG
00000b0: 7f40 0401 0000 0000 feff 0000 0000 0800 .@..............
00000c0: ca00 f900 1027 0000 b088 e0e8 0000 0000 .....'..........
00000d0: ca00 0000 0000 875a 0050 a2cc cb22 44fc .......Z.P..."D.
00000e0: 0000 0000 0000 0000 0000 0000 0000 1440 ...............@
00000f0: 1440 0000 0000 0000 0000 0000 0000 0000 .@..............
0000100: 0100 0b00 0000 0000 8020 f10d 20fa 0100 ......... .. ...
0000110: 0040 0404 0403 0000 0000 0502 0604 0504 .@..............
0000120: 0506 0803 0506 0504 0505 0603 0505 0000 ................
0000130: 3741 3342 0000 0a78 0000 bd5d d3a1 0080 7A3B...x...]....
0000140: 0000 0000 0000 0000 0000 0000 0000 0000 ................
0000150: 0200 0000 0000 0000 0000 0000 0000 0000 ................
0000160: 0000 0000 0000 0000 0000 0000 0000 0000 ................
0000170: 0000 0000 0000 0000 0000 0000 0000 0000 ................
0000180: 0000 0000 0000 0000 0000 0000 0000 0000 ................
0000190: 0000 0000 0000 0000 0000 0000 3d00 0000 ............=...
00001a0: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00001b0: 0000 201c 0000 0000 0000 0000 1f10 2100 .. ...........!.
00001c0: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00001d0: 0000 0000 0100 e003 0000 0000 0000 0000 ................
00001e0: 0000 0000 0000 0000 0000 0000 0000 0000 ................
00001f0: 0000 0000 0000 0000 0000 0000 0000 a503 ................
В нем содержится полная информация о диске, извлечем основную.
serial = swapString(identify[20:40])
firmware = swapString(identify[46:53])
model = swapString(identify[54:93])
sectors = int.from_bytes(identify[200] + identify[201] + identify[202] + identify[203] +
identify[204] + identify[205] + identify[206] + identify[207], byteorder='little')
В результате получаем:
модель: Hitachi HUA722020ALA330; прошивка: JKAOA3; серийный номер: JK11A1YAJE2N5V; число секторов: 3907029168.
Теперь мы умеем отправлять ata команды диску и получать от него ответы. Потихоньку результат моей работы оформился в библиотечку, содержащую реализацию основного набора ata команд, включая чтение SMART. Кому интересно можно взглянуть на нее здесь. Качество кода сильно не ругайте, я не волшебник программист я только учусь.
Теперь осталось с ее помощью написать утилиту тестирования. Чувствую меня ждет еще много открытий.