{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Funktionen und Abhängigkeiten\n", "\n", "import time\n", "import os\n", "from ipywidgets import interact\n", "from scipy import signal\n", "import numpy as np\n", "import matplotlib.pyplot as plt\n", "import soundfile as sf\n", "\n", "def load_wav(filename):\n", " y, fs = sf.read(filename, dtype='float32')\n", " return fs, y.T\n", "\n", "# ANR Funtkion\n", "def lms_filter(input, ref_noise, coefficients, mu, adaption_step = 1):\n", " coefficient_matrix = np.zeros((len(input), coefficients), dtype=np.float32)\n", " output=np.zeros(input.shape[0], dtype=np.float32)\n", " filter = np.zeros(coefficients, dtype=np.float32)\n", " \n", " for j in range(0, len(input) - len(filter)): \n", " accumulator=0\n", " for i in range(coefficients):\n", " noise=ref_noise[j+i]\n", " accumulator+=filter[i] * noise\n", " output[j] = input[j] - accumulator\n", " corrector = mu * output[j]\n", " if (j % adaption_step) == 0:\n", " for k in range(coefficients):\n", " filter[k] += corrector*ref_noise[j+k] \n", " coefficient_matrix[j, :] = filter[:]\n", " return output, coefficient_matrix\n", "\n", "# Transferfunktion mit kontinuierlichem Abklingen und dann abruptem Stop\n", "def transfer_function(input, n):\n", " continuous_decay = np.linspace(1.0, 0.1, n)\n", " output = np.zeros(n) \n", " for i in range(0, len(output)):\n", " if i < len(output)/2:\n", " output[i] = input[i] * continuous_decay[i]\n", " else:\n", " output[i] = 0\n", " return output\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Plots für Simple Usecases\n", "\n", "plot = 'noise'\n", "\n", "# Chirp Generator\n", "n=2000 #Sampleanzahl\n", "fs=20000 #Samplingrate\n", "f0=100 #Startfrequenz\n", "f1=1000 #Stopfrequenz\n", "t1=n/fs #Chirpdauer (Samples/Samplingrate)\n", "if plot == 'sine_1':\n", " f_disturber=2000 #Störfrequenz\n", "else:\n", " f_disturber=500 #Störfrequenz\n", "\n", "signal_amplitude=1\n", "disturber_amplitude=0.5\n", "\n", "coefficients = 16\n", "step_size = 0.01\n", "indices = [0, coefficients // 2, coefficients - 1]\n", "\n", "t = np.linspace(0, t1, n)\n", "\n", "# Zielsignal anlegen\n", "target_signal = signal.chirp(t, f0=f0, f1=f1, t1=t1, method='linear')*signal_amplitude\n", "\n", "# Störsignal anlegen\n", "if plot == 'sine_1' or plot == 'sine_2':\n", " noise_signal = np.sin(2*np.pi*f_disturber*t) * disturber_amplitude\n", "else:\n", " noise_signal = np.random.normal(0, 1, n) * disturber_amplitude\n", "\n", "# Korruptes Zielsignal anlegen\n", "corrupted_target_signal = target_signal + noise_signal\n", "\n", "t = np.linspace(0, len(corrupted_target_signal), len(corrupted_target_signal))\n", "output, coefficient_matrix = lms_filter(corrupted_target_signal, noise_signal, coefficients, step_size, adaption_step=1)\n", "\n", "\n", "# Koeffizientenmatrix und Vergleich um Koeffizientenanzahl kürzen, um Tail zu vermeiden, 2.te Zeitachse anlegen\n", "coefficient_matrix = coefficient_matrix[:-coefficients]\n", "comparison = (output - target_signal)[:-coefficients]\n", "MSE = round(np.square(np.subtract(target_signal,output)).mean(),3)\n", "t2 = np.linspace(0, len(comparison), len(comparison))\n", "\n", "# Schriftgrößen für LaTeX-Dokument\n", "plt.rcParams.update({\n", " 'font.size': 14, # Standardtext\n", " 'axes.titlesize': 16, # Titel\n", " 'axes.labelsize': 15, # Achsenbeschriftungen\n", " 'xtick.labelsize': 13, # Tick-Beschriftungen\n", " 'ytick.labelsize': 13,\n", " 'legend.fontsize': 13 # Legende\n", "})\n", "\n", "# Plot des clean Target Signals\n", "figure0, ax0 = plt.subplots(1, 1, figsize=(15, 3.5))\n", "ax0.set_ylim(min(corrupted_target_signal), max(corrupted_target_signal))\n", "ax0.plot(t, target_signal, c='deepskyblue')\n", "ax0.set_title('Clean target signal')\n", "ax0.set_xlabel('time(ms)')\n", "ax0.set_ylabel('Amplitude')\n", "\n", "# Plots des Filterprozesses\n", "figure1, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(15, 9), sharex=True, sharey=True)\n", "ax1.set_ylim(min(corrupted_target_signal), max(corrupted_target_signal))\n", "ax1.plot(t, corrupted_target_signal, c='royalblue')\n", "ax2.plot(t, noise_signal, c='chocolate')\n", "ax3.plot(t, output, c='green')\n", "\n", "ax1.set_title('Target signal sensor')\n", "ax2.set_title('Noise signal sensor')\n", "ax3.set_title('Filter output')\n", "\n", "ax3.set_xlabel('time(ms)')\n", "ax1.set_ylabel('Amplitude')\n", "ax2.set_ylabel('Amplitude')\n", "ax3.set_ylabel('Amplitude')\n", "ax1.legend(['Corrputed target signal'])\n", "ax2.legend(['Noise signal'])\n", "ax3.legend([f'{coefficients} Coefficients, {step_size} Stepsize'], loc='upper right')\n", "\n", "# Plots der Filterperfomanz\n", "figure2, (ax4, ax5) = plt.subplots(2, 1, figsize=(15, 6), sharex=True)\n", "ax4.set_ylim(min(corrupted_target_signal), max(corrupted_target_signal))\n", "ax4.plot(t2, comparison, c='purple')\n", "for i in indices:\n", " ax5.plot(t2, coefficient_matrix[:,i])\n", "ax4.set_title('Error Target signal / Filter output')\n", "ax5.set_title('Coeffcient convergence')\n", "\n", "ax5.set_xlabel('time(ms)')\n", "ax4.set_ylabel('Error')\n", "ax5.set_ylabel('Coeffcient value')\n", "ax4.legend([f'MSE = {MSE}'], loc='upper right')\n", "ax5.legend(['First coeffcient','Medium coefficient','Last coefficient'], loc='upper right')\n", "\n", "figure0.tight_layout()\n", "figure1.tight_layout()\n", "figure2.tight_layout()\n", "figure0.savefig(f'fig_target_signal_simple', dpi=600)\n", "figure1.savefig(f'fig_plot_1_{plot}', dpi=600)\n", "figure2.savefig(f'fig_plot_2_{plot}', dpi=600)\n", "plt.show()\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Plots für intermediate Usecases\n", "\n", "# Chirp Generator\n", "n=2000 #Sampleanzahl\n", "fs=20000 #Samplingrate\n", "f0=100 #Startfrequenz\n", "f1=1000 #Stopfrequenz\n", "t1=n/fs #Chirpdauer (Samples/Samplingrate)\n", "f_disturber=2000 #Störfrequenz\n", "\n", "# .wav File laden, Tonspuren den Signalen zuordnen, Corrputed Target Signal erstellen\n", "fs, data = load_wav(f'./testdata/input/breathing_peak_01g_external_Speaker_80dBSPL_PDM.wav')\n", "target_signal = data[1]\n", "noise_signal = data[0]\n", "corrupted_target_signal = target_signal + noise_signal\n", "\n", "coefficients = 16\n", "step_size = 0.01\n", "indices = [0, coefficients // 2, coefficients - 1]\n", "\n", "t = np.linspace(0, len(corrupted_target_signal), len(corrupted_target_signal))\n", "output, coefficient_matrix = lms_filter(corrupted_target_signal, noise_signal, coefficients, step_size, adaption_step=1)\n", "\n", "# Audiodateien zum Vergleich abspeichern\n", "# sf.write('corrupted_target_signal.wav', corrupted_target_signal, fs)\n", "# sf.write('filter_output.wav', output, fs)\n", "\n", "# Koeffizientenmatrix und Vergleich um Koeffizientenanzahl kürzen, um Tail zu vermeiden, 2.te Zeitachse anlegen\n", "coefficient_matrix = coefficient_matrix[:-coefficients]\n", "comparison = (output - target_signal)[:-coefficients]\n", "MSE = round(np.square(np.subtract(target_signal,output)).mean(),3)\n", "t2 = np.linspace(0, len(comparison), len(comparison))\n", "\n", "# Schriftgrößen für LaTeX-Dokument\n", "plt.rcParams.update({\n", " 'font.size': 14, # Standardtext\n", " 'axes.titlesize': 16, # Titel\n", " 'axes.labelsize': 15, # Achsenbeschriftungen\n", " 'xtick.labelsize': 13, # Tick-Beschriftungen\n", " 'ytick.labelsize': 13,\n", " 'legend.fontsize': 13 # Legende\n", "})\n", "\n", "# Plot des clean Target Signals\n", "figure0, ax0 = plt.subplots(1, 1, figsize=(15, 3.5))\n", "ax0.set_ylim(min(corrupted_target_signal), max(corrupted_target_signal))\n", "ax0.plot(t, target_signal, c='deepskyblue')\n", "ax0.set_ylim(min(corrupted_target_signal), max(corrupted_target_signal))\n", "ax0.set_title('Clean target signal')\n", "ax0.set_xlabel('time(ms)')\n", "ax0.set_ylabel('Amplitude')\n", "\n", "# Plots des Filterprozesses\n", "figure1, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(15, 9), sharex=True, sharey=True)\n", "ax1.set_ylim(min(corrupted_target_signal), max(corrupted_target_signal))\n", "ax1.plot(t, corrupted_target_signal, c='royalblue')\n", "ax2.plot(t, noise_signal, c='chocolate')\n", "ax3.plot(t, output, c='green')\n", "\n", "ax1.set_title('Target signal sensor')\n", "ax2.set_title('Noise signal sensor')\n", "ax3.set_title('Filter output')\n", "\n", "ax3.set_xlabel('time(ms)')\n", "ax1.set_ylabel('Amplitude')\n", "ax2.set_ylabel('Amplitude')\n", "ax3.set_ylabel('Amplitude')\n", "ax1.legend(['Corrputed target signal'])\n", "ax2.legend(['Noise signal'])\n", "ax3.legend([f'{coefficients} Coefficients, {step_size} Stepsize'], loc='upper right')\n", "\n", "# Plots der Filterperfomanz\n", "figure2, (ax4, ax5) = plt.subplots(2, 1, figsize=(15, 6), sharex=True)\n", "ax4.set_ylim(min(corrupted_target_signal), max(corrupted_target_signal))\n", "ax4.plot(t2, comparison, c='purple')\n", "for i in indices:\n", " ax5.plot(t2, coefficient_matrix[:,i])\n", "ax4.set_title('Error Target signal / Filter output')\n", "ax5.set_title('Coeffcient convergence')\n", "\n", "ax5.set_xlabel('time(ms)')\n", "ax4.set_ylabel('Error')\n", "ax5.set_ylabel('Coeffcient value')\n", "ax4.legend([f'MSE = {MSE}'], loc='upper right')\n", "ax5.legend(['First coeffcient','Medium coefficient','Last coefficient'], loc='upper right')\n", "\n", "figure0.tight_layout()\n", "figure1.tight_layout()\n", "figure2.tight_layout()\n", "figure0.savefig(f'fig_target_signal_intermediate', dpi=600)\n", "figure1.savefig(f'fig_plot_1_wav', dpi=600)\n", "figure2.savefig(f'fig_plot_2_wav', dpi=600)\n", "plt.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Plots für komplexen Usecases\n", "\n", "# Chirp Generator\n", "n=2000 #Sampleanzahl\n", "fs=20000 #Samplingrate\n", "f0=100 #Startfrequenz\n", "f1=1000 #Stopfrequenz\n", "t1=n/fs #Chirpdauer (Samples/Samplingrate)\n", "f_disturber=2000 #Störfrequenz\n", "\n", "# .wav File laden, Tonspuren den Signalen zuordnen, Corrputed Target Signal erstellen, Reduced Noise Signal erstellen\n", "fs, data = load_wav(f'./testdata/input/breathing_peak_01g_external_Speaker_80dBSPL_PDM.wav')\n", "target_signal = data[1]\n", "noise_signal = data[0]\n", "reduced_noise_signal = transfer_function(noise_signal, len(noise_signal))\n", "corrupted_target_signal = target_signal + reduced_noise_signal\n", "\n", "coefficients = 16\n", "step_size = 0.01\n", "indices = [0, coefficients // 2, coefficients - 1]\n", "\n", "t = np.linspace(0, len(corrupted_target_signal), len(corrupted_target_signal))\n", "output, coefficient_matrix = lms_filter(corrupted_target_signal, noise_signal, coefficients, step_size, adaption_step=1)\n", "\n", "# Audiodateien zum Vergleich abspeichern\n", "sf.write('corrupted_target_signal.wav', corrupted_target_signal, fs)\n", "sf.write('filter_output.wav', output, fs)\n", "\n", "# Koeffizientenmatrix und Vergleich um Koeffizientenanzahl kürzen, um Tail zu vermeiden, 2.te Zeitachse anlegen\n", "coefficient_matrix = coefficient_matrix[:-coefficients]\n", "comparison = (output - target_signal)[:-coefficients]\n", "MSE = round(np.square(np.subtract(target_signal,output)).mean(),3)\n", "t2 = np.linspace(0, len(comparison), len(comparison))\n", "\n", "# Schriftgrößen für LaTeX-Dokument\n", "plt.rcParams.update({\n", " 'font.size': 14, # Standardtext\n", " 'axes.titlesize': 16, # Titel\n", " 'axes.labelsize': 15, # Achsenbeschriftungen\n", " 'xtick.labelsize': 13, # Tick-Beschriftungen\n", " 'ytick.labelsize': 13,\n", " 'legend.fontsize': 13 # Legende\n", "})\n", "\n", "# Plot des clean Target Signals\n", "figure0, ax0 = plt.subplots(1, 1, figsize=(15, 3.5))\n", "ax0.set_ylim(min(corrupted_target_signal), max(corrupted_target_signal))\n", "ax0.plot(t, target_signal, c='deepskyblue')\n", "ax0.set_ylim(min(corrupted_target_signal), max(corrupted_target_signal))\n", "ax0.set_title('Clean target signal')\n", "ax0.set_xlabel('time(ms)')\n", "ax0.set_ylabel('Amplitude')\n", "\n", "# Plots des Filterprozesses\n", "figure1, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(15, 9), sharex=True, sharey=True)\n", "ax1.set_ylim(min(corrupted_target_signal), max(corrupted_target_signal))\n", "ax1.plot(t, corrupted_target_signal, c='royalblue')\n", "ax2.plot(t, noise_signal, c='chocolate')\n", "ax3.plot(t, output, c='green')\n", "\n", "ax1.set_title('Target signal sensor')\n", "ax2.set_title('Noise signal sensor')\n", "ax3.set_title('Filter output')\n", "\n", "ax3.set_xlabel('time(ms)')\n", "ax1.set_ylabel('Amplitude')\n", "ax2.set_ylabel('Amplitude')\n", "ax3.set_ylabel('Amplitude')\n", "ax1.legend(['Corrputed target signal'])\n", "ax2.legend(['Noise signal'])\n", "ax3.legend([f'{coefficients} Coefficients, {step_size} Stepsize'], loc='upper right')\n", "\n", "# Plots der Filterperfomanz\n", "figure2, (ax4, ax5) = plt.subplots(2, 1, figsize=(15, 6), sharex=True)\n", "ax4.set_ylim(min(corrupted_target_signal), max(corrupted_target_signal))\n", "ax4.plot(t2, comparison, c='purple')\n", "for i in indices:\n", " ax5.plot(t2, coefficient_matrix[:,i])\n", "ax4.set_title('Error Target signal / Filter output')\n", "ax5.set_title('Coeffcient convergence')\n", "\n", "ax5.set_xlabel('time(ms)')\n", "ax4.set_ylabel('Error')\n", "ax5.set_ylabel('Coeffcient value')\n", "ax4.legend([f'MSE = {MSE}'], loc='upper right')\n", "ax5.legend(['First coeffcient','Medium coefficient','Last coefficient'], loc='upper right')\n", "\n", "figure0.tight_layout()\n", "figure1.tight_layout()\n", "figure2.tight_layout()\n", "figure0.savefig(f'fig_target_signal_intermediate', dpi=600)\n", "figure1.savefig(f'fig_plot_1_wav', dpi=600)\n", "figure2.savefig(f'fig_plot_2_wav', dpi=600)\n", "plt.show()" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.13" } }, "nbformat": 4, "nbformat_minor": 2 }