Фильтрация шума сигнала

image-loader.svg

Фильтрация шума очень важная вещь, при работе с различными датчиками. Сигнал, получаемый от них всегда приходит с шумами, и важно уметь их грамотно отфильтровать. Качественная фильтрация шума способна уменьшить погрешность и увеличить качество измерения датчика. Этим мы сегодня и займемся.

Примерами будут три графика — синусоида, квадратный сигнал и треугольный сигнал.

Код для вывода графиков

import matplotlib.pyplot as plt
from math import sin

# Generals variables
length = 30
resolution = 20

# Creating arrays with graphic
sinus_g = [sin(i / resolution) for i in range(length * resolution)]

square_g = [(1 if p > 0 else -1) for p in sinus_g]

triangle_g = []
t = -1
for _ in range(length * resolution):
    t = t+0.035 if t < 1 else -1
    triangle_g.append(t)

# Output of graphs
graphics = [sinus_g, square_g, triangle_g]

fig, axs = plt.subplots(3, 1)

for i in range(len(graphics)):
    axs[i].plot(graphics[i])

    axs[i].set_ylim([-2, 2]), axs[i].set_xlim([0, length*resolution]),
    axs[i].set_yticklabels([]), axs[i].set_xticklabels([])

plt.show()

синусоида, квадратный сигнал и треугольный сигналсинусоида, квадратный сигнал и треугольный сигнал

Шум делится на два типа: постоянный шум с относительно стабильной амплитудой и случайные выбросы, вызванные внешними факторами. Симулировать это мы будем данным образом.

Функция для добавление шума

def noised(func, k=0.3, prob=0.03):
    o = []
    for p in func:
        r = (np.random.random()*2-1) * k

        # Standard noise and random emissions
        if np.random.random() < prob: c = p + r*7
        else: c = p + r

        o.append(c)
    return o

image-loader.svg

Среднее арифметическое

Как по мне, самый очевидный способ минимизировать шум. Для его реализации потребуется ввести буфер нескольких предыдущих значений, каждый раз, когда опрашивается датчик буфер сдвигается (первый элемент удаляется, а новое значение датчика добавляется в конец или как-нибудь по другому, главное фиксированный размер буфера). От его размера и будет зависеть результат и быстродействие кода. С фильтрацией алгоритм справляется очень неплохо. Но его проблема заключается в производительности. Контроллеру приходится делать множество вычислений с плавающей точкой, что может сказаться на скорости выполнения кода. Но если датчик не следует очень часто опрашивать, то этот метод отлично подойдет.

Код фильтрация графика средним арифметическим

def arith_mean(f, buffer_size=10):
    # Creating buffer
    if not hasattr(arith_mean, "buffer"):
        arith_mean.buffer = [f] * buffer_size

    # Move buffer to actually values ( [0, 1, 2, 3] -> [1, 2, 3, 4] )
    arith_mean.buffer = arith_mean.buffer[1:]
    arith_mean.buffer.append(f)

    # Calculation arithmetic mean
    mean = 0
    for e in arith_mean.buffer: mean += e
    mean /= len(arith_mean.buffer)

    return mean

buffer_size = 7buffer_size = 7buffer_size = 25buffer_size = 25

Как мы видим, при увеличении буфера квадратный и треугольный сигналы сильно исказились, а синусоида сместилась. Проявляется запаздывание среднего значения. Так что, при использовании данного фильтра стоит аккуратнее подбирать размер буфера.

Медианный фильтр

Он скорее подойдет не как отдельный фильтр, а дополняющий к другому. Медианный фильтр отлично справляется со случайными выбросами. Если среднее арифметическое получая на вход (10, 12, 55), выдаст 25.67, то медиан выдаст 12. На первый взгляд не так просто понять как он устроен, но со своей задачей он справляется отлично. На просторах интернета я нашел лаконичное исполнение.

middle = (max(a,b) == max(b, c)) ? max(a, c) : max(b, min(a, c)); // c++
middle = max(a, c) if (max(a, b) == max(b, c)) else max(b, min(a, c)) # python

Код медианного фильтра

def median(f):
    # Creating buffer
    if not hasattr(median, "buffer"):
        median.buffer = [f] * 3

    # Move buffer to actually values ( [0, 1, 2] -> [1, 2, 3] )
    median.buffer = median.buffer[1:]
    median.buffer.append(f)

    # Calculation median
    a = median.buffer[0]
    b = median.buffer[1]
    c = median.buffer[2]
    middle = max(a, c) if (max(a, b) == max(b, c)) else max(b, min(a, c))

    return middle

Результат действия медианного фильтраРезультат действия медианного фильтра

Медианный фильтр отфильтровал почти все случайные выбросы. К тому же этот алгоритм совершенно прост в вычислении. И используя его в комбинации с каким-либо другим другим фильтром можно получить максимальный результат.

Экспоненциальное бегущее среднее и адаптивный коэффициент

Этот фильтр по своей сути схож с первым, а главное он более простой по вычислениям. Работает он так: к предыдущему фильтрованному значению прибавляется новое, и каждое из них помножено на собственный коэффициент, сумма которых равна 1. Коэффициент k подбирается от 0 до 1 и означает важность нового значения по сравнению с предыдущем, то есть чем больше k, тем больше важность нового нефильтрованного значения и фильтрованный график ближе к изначальному.

normalised = new * k + normalised * (1-k) # Эта формула
normalised += (new - normalised) * k # А лучше эта

Адаптивный коэффициент нужен для корректной фильтрации квадратных сигналов. Реализуется он в одну строчку и означает, что если предыдущее фильтрованное значение слишком далеко от нового (то есть корректный сигнал изменился), то k увеличиваем чтобы момент смены сигнала был четко виден, а иначе k оставляем нормальным.

k = s_k if (abs(new - normalised) < d) else max_k

Код фильтра упрощенного бегущего среднего с адаптивным коэффициент

def easy_mean(f, s_k=0.2, max_k=0.9, d=1.5):
    # Creating static variable
    if not hasattr(easy_mean, "fit"):
        easy_mean.fit = f

    # Adaptive ratio
    k = s_k if (abs(f - easy_mean.fit) < d) else max_k

    # Calculation easy mean
    easy_mean.fit += (f - easy_mean.fit) * k

    return easy_mean.fit

image-loader.svg

Работает достаточно хорошо, но из-за адаптивного коэффициента появляются некие артефакты в моментах с выбросами, так как они достаточно велики, чтобы определиться как составляющие квадратного сигнала. С этим можно бороться, увеличивая параметр d (требуемое расстояние для определения квадратного сигнала), но есть решение элегантнее. И тут нам как раз нам потребуется медианный фильтр. Если перед подачей данных на бегущее среднее, прогнать их через медиану, то можно получить качественный и быстродейственный алгоритм для фильтрации сигнала.

с использованием медианного фильтрас использованием медианного фильтра

Все артефакты исчезли. Данная связка алгоритмов является одной из самых эффективных и быстродейственных. Она может быть применена практически везде.

Применение связки фильтров

def normalise(func):
    o = []
    for p in func:
        res = median(p)
        res = easy_mean(res)

        o.append(res)
    return o

Фильтр Калмана

Еще один фильтр, принцип работы которого не столь очевиден для каждого. Из всего кода вам необходимо знать, что r — это приблизительная амплитуда шума, а q следует подбирать самостоятельно. Если вы хотите разобраться в его устройстве поподробнее, можете прочитать эту статью с википедии, а код я взял и отредактировал с этой страницы.

Код фильтра Калмана

def kalman(f, q=0.25, r=0.7):
    if not hasattr(kalman, "Accumulated_Error"):
        kalman.Accumulated_Error = 1
        kalman.kalman_adc_old = 0

    if abs(f-kalman.kalman_adc_old)/50 > 0.25:
        Old_Input = f*0.382 + kalman.kalman_adc_old*0.618
    else:
        Old_Input = kalman.kalman_adc_old

    Old_Error_All = (kalman.Accumulated_Error**2 + q**2)**(1/2)
    H = Old_Error_All**2/(Old_Error_All**2 + r**2)
    kalman_adc = Old_Input + H * (f - Old_Input)
    kalman.Accumulated_Error = ((1 - H)*Old_Error_All**2)**(1/2)
    kalman.kalman_adc_old = kalman_adc

    return kalman_adc

image-loader.svg

Со своей задачей фильтр справляется, но он не всегда подойдет из-за множества вычислений с плавающей точкой.

Какой фильтр выбрать?

Это зависит от обстоятельств в которых фильтр будет использоваться. Естественно стоит попробовать каждый, но практически для каждого случая подойдёт вариант №2 из списка ниже. Напомню, что наиболее выгодная связка — медианного фильтра с другим. Пройдемся вкратце по перечисленным фильтрам из данной статьи:

  1. Медианный фильтр — самый быстродейственный алгоритм, применятся для отсеивания случайных выбросов, наиболее эффективен в связках с другими алгоритмами.

  2. Экспоненциальное бегущее среднее с адаптивным коэффициент — универсальный, и простой фильтр, подойдет в большинстве ситуаций

  3. Среднее арифметическое — эффективный, но не столь быстродейственный алгоритм

  4. Фильтр Калмана — универсальный способ фильтрации любого сигнала, но громоздкий по вычислениям

Кстати, если вы работаете со средой arduino, то там присутствует удобный инструмент визуализации графиков из значений переданных в порт, статья об этом. Это может упростить работу с сигналами.

И на последок исходники кодов, и еще несколько примеров…

Код для визуализация графиков

import matplotlib.pyplot as plt
from math import sin
from _f import *

# Generals variables
length = 30
resolution = 20

# Creating arrays with graphic
sinus_g = [sin(i / resolution) for i in range(length * resolution)]

square_g = [(1 if p > 0 else -1) for p in sinus_g]

triangle_g = []
t = -1
for _ in range(length * resolution):
    t = t+0.035 if t < 1 else -1
    triangle_g.append(t)

# Output of graphs
graphics = [sinus_g, square_g, triangle_g]

fig, axs = plt.subplots(3, 1)

for i in range(len(graphics)):
    noised_f = noised(graphics[i])

    axs[i].plot(noised_f, color='blue')
    axs[i].plot(graphics[i], color='black')
    axs[i].plot(normalise(noised_f), linewidth=3, color='red')

    axs[i].set_ylim([-2, 2]), axs[i].set_xlim([0, length*resolution]),
    axs[i].set_yticklabels([]), axs[i].set_xticklabels([])

plt.show()

Код с фильтрами

import numpy as np

np.random.seed(58008)


def normalise(func):
    o = []
    for p in func:
        res = median(p)
        res = easy_mean(res)

        o.append(res)
    return o


def noised(func, k=0.3, fitob=0.03):
    o = []
    for p in func:
        r = (np.random.random()*2-1) * k

        # Standard noise and random emissions
        if np.random.random() < fitob: c = p + r*7
        else: c = p + r

        o.append(c)
    return o


def arith_mean(f, buffer_size=10):
    # Creating buffer
    if not hasattr(arith_mean, "buffer"):
        arith_mean.buffer = [f] * buffer_size

    # Move buffer to actually values ( [0, 1, 2, 3] -> [1, 2, 3, 4] )
    arith_mean.buffer = arith_mean.buffer[1:]
    arith_mean.buffer.append(f)

    # Calculation arithmetic mean
    mean = 0
    for e in arith_mean.buffer: mean += e
    mean /= len(arith_mean.buffer)

    return mean


def easy_mean(f, s_k=0.2, max_k=0.9, d=1.5):
    # Creating static variable
    if not hasattr(easy_mean, "fit"):
        easy_mean.fit = f

    # Adaptive ratio
    k = s_k if (abs(f - easy_mean.fit) < d) else max_k

    # Calculation easy mean
    easy_mean.fit += (f - easy_mean.fit) * k

    return easy_mean.fit


def median(f):
    # Creating buffer
    if not hasattr(median, "buffer"):
        median.buffer = [f] * 3

    # Move buffer to actually values ( [0, 1, 2] -> [1, 2, 3] )
    median.buffer = median.buffer[1:]
    median.buffer.append(f)

    # Calculation median
    a = median.buffer[0]
    b = median.buffer[1]
    c = median.buffer[2]
    middle = max(a, c) if (max(a, b) == max(b, c)) else max(b, min(a, c))

    return middle


def kalman(f, q=0.25, r=0.7):
    if not hasattr(kalman, "Accumulated_Error"):
        kalman.Accumulated_Error = 1
        kalman.kalman_adc_old = 0

    if abs(f-kalman.kalman_adc_old)/50 > 0.25:
        Old_Input = f*0.382 + kalman.kalman_adc_old*0.618
    else:
        Old_Input = kalman.kalman_adc_old

    Old_Error_All = (kalman.Accumulated_Error**2 + q**2)**(1/2)
    H = Old_Error_All**2/(Old_Error_All**2 + r**2)
    kalman_adc = Old_Input + H * (f - Old_Input)
    kalman.Accumulated_Error = ((1 - H)*Old_Error_All**2)**(1/2)
    kalman.kalman_adc_old = kalman_adc

    return kalman_adc

фильтрация сильного шума с помощью kalman(p, r=3, q=0.4), arith_mean(res, buffer_size=5)фильтрация сильного шума с помощью kalman (p, r=3, q=0.4), arith_mean (res, buffer_size=5)просто линейный шум, нормализованный средним арифметическим с буфером в 20просто линейный шум, нормализованный средним арифметическим с буфером в 20

Ещё интересный эксперимент: я построчно загрузил зашумленную картинку своего кота и пропустил её через фильтр Калмана.

image-loader.svg

© Habrahabr.ru