{ "cells": [ { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import time\n", "import os\n", "from ipywidgets import interact\n", "from scipy import signal\n", "import numpy as np\n", "import matplotlib.pyplot as plt\n", "import soundfile as sf\n", "from numba import njit, jit\n", "\n", "\n", "# Chirp Generator\n", "n=2000 #Sampleanzahl\n", "fs=20000 #Samplingrate\n", "f0=100 #Startfrequenz\n", "f1=1000 #Stopfrequenz\n", "t1=n/fs #Chirpdauer (Samples/Samplingrate)\n", "f_disturber=2000 #Störfrequenz\n", "\n", "disturber_amplitude=0.3\n", "chirp_disturber_full_amp=0.6\n", "\n", "t = np.linspace(0, t1, n)\n", "\n", "# Chirp anlegen mit Amplitude\n", "y_signal = signal.chirp(t, f0=f0, f1=f1, t1=t1, method='linear')\n", "y_signal = (chirp_disturber_full_amp-disturber_amplitude)*y_signal#*(2**(15-1))\n", "\n", "# Störsinus anlegen, wird auf Chirp Signal addiert\n", "y_disturber_sine = np.sin(2*np.pi*f_disturber*t)\n", "y_disturber_sine = y_disturber_sine * disturber_amplitude #* (2**(15-1))\n", "y_signal_disturber_sine = y_signal + y_disturber_sine\n", "\n", "# Störrauschen anlegen, wird auf Chirp Signal addiert\n", "y_disturber_noise = np.random.normal(0, 0.2, n)\n", "y_disturber_noise = y_disturber_noise * disturber_amplitude #* (2**(15-1))\n", "y_signal_disturber_noise = y_signal + y_disturber_noise\n", "\n", "def load_wav(filename):\n", " y, fs = sf.read(filename, dtype='float32')\n", " return fs, y.T" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "ename": "FileNotFoundError", "evalue": "[WinError 3] The system cannot find the path specified: './lpdsp32/cSensorSignalProcessing/test/testdata/input'", "output_type": "error", "traceback": [ "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[1;31mFileNotFoundError\u001b[0m Traceback (most recent call last)", "Cell \u001b[1;32mIn[3], line 29\u001b[0m\n\u001b[0;32m 27\u001b[0m wav_folder_path \u001b[38;5;241m=\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m./lpdsp32/cSensorSignalProcessing/test/testdata/input\u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[0;32m 28\u001b[0m \u001b[38;5;66;03m# Get a list of all files in the folder\u001b[39;00m\n\u001b[1;32m---> 29\u001b[0m file_names \u001b[38;5;241m=\u001b[39m [f \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m \u001b[43mos\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mlistdir\u001b[49m\u001b[43m(\u001b[49m\u001b[43mwav_folder_path\u001b[49m\u001b[43m)\u001b[49m \u001b[38;5;28;01mif\u001b[39;00m f\u001b[38;5;241m.\u001b[39mendswith(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m.wav\u001b[39m\u001b[38;5;124m\"\u001b[39m)]\n\u001b[0;32m 30\u001b[0m data_sel\u001b[38;5;241m=\u001b[39m[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mchirp_sine\u001b[39m\u001b[38;5;124m'\u001b[39m, \u001b[38;5;124m'\u001b[39m\u001b[38;5;124mchirp_noise\u001b[39m\u001b[38;5;124m'\u001b[39m] \u001b[38;5;241m+\u001b[39m file_names\n\u001b[0;32m 32\u001b[0m \u001b[38;5;66;03m# setup the plot\u001b[39;00m\n", "\u001b[1;31mFileNotFoundError\u001b[0m: [WinError 3] The system cannot find the path specified: './lpdsp32/cSensorSignalProcessing/test/testdata/input'" ] } ], "source": [ "%matplotlib widget\n", "\n", "@njit\n", "def lms_fir(data, ref_noise, N_coeffs, mu, scale_bits=31, adaption_step = 1, fix_point=False):\n", " coeffs_matrix = np.zeros((len(data), N_coeffs), dtype=np.float32) #2d Koeffizientenmatrix anlegen, Zeilen->Samples, Spalten -> Koeffizienten für Plot abspeichern\n", " out=np.zeros(data.shape[0], dtype=np.float32) #1d Ausgabematrix anlegen, selbes Format wie Input-Daten\n", " coeffs = np.zeros(N_coeffs, dtype=np.float32) #1d Filtermatrix anlegen, Länge ist Anzahl der Koeffizienten\n", " tap_buffer = np.zeros(N_coeffs, dtype=np.float32) #1d Tap Buffer Matrix anlegen - warum?\n", " \n", " # Interate over the data\n", " for j in range(0, len(data) - len(coeffs)): \n", " acc=0\n", " for i in range(N_coeffs): #iterate over the coefficients to calculate the filter and get the canceller\n", " noise=ref_noise[j+i]\n", " tap_buffer[i] = noise\n", " acc+=coeffs[i] * noise\n", " out[j] = data[j]-acc #Ausgabesignal = Eingangssignal - gefiltertes Störsignal?\n", " prod = mu*out[j]\n", " if (j % adaption_step) == 0:\n", " for i in range(N_coeffs):\n", " coeffs[i] += prod*tap_buffer[i] \n", " coeffs_matrix[j, :] = coeffs[:] #Koeffizienten in Matrix abspeichern für Plot\n", " return out, coeffs_matrix\n", "\n", "\n", "# load the wav files names\n", "wav_folder_path = \"./lpdsp32/cSensorSignalProcessing/test/testdata/input\"\n", "# Get a list of all files in the folder\n", "file_names = [f for f in os.listdir(wav_folder_path) if f.endswith(\".wav\")]\n", "data_sel=['chirp_sine', 'chirp_noise'] + file_names\n", "\n", "# setup the plot\n", "cols = 1\n", "rows = 4\n", "fig=plt.figure(figsize=(11, 6))\n", "plot1 = plt.subplot2grid( (rows, cols), (0,0), 1)\n", "line1 = plot1.plot([0], label=\"Chirp with noise\")\n", "plot2 = plt.subplot2grid((rows,cols),(1,0), sharex=plot1)\n", "line2 = plot2.plot([0], label=\"noise\")\n", "plot3 = plt.subplot2grid((rows, cols), (2,0), sharex=plot1)\n", "line3 = plot3.plot([0], label=\"output\")\n", "plot4 = plt.subplot2grid((rows, cols), (3,0), sharex=plot1)\n", "line4 = plot4.plot([0], label=\"filter coefficients\")\n", "\n", "\n", "def update(\n", " data_sel, num_coeff = 128, mu=0.01, \n", " adaption_step=1\n", " ):\n", " start0 = time.time()\n", " global y_signal_disturber, y_disturber\n", "\n", " if data_sel == \"chirp_sine\":\n", " y_s_d = y_signal_disturber_sine.astype(np.float32)\n", " y_d = y_disturber_sine.astype(np.float32)\n", " elif data_sel == \"chirp_noise\":\n", " y_s_d = y_signal_disturber_noise.astype(np.float32)\n", " y_d = y_disturber_noise.astype(np.float32)\n", " else:\n", " fs, data = load_wav(f\"{wav_folder_path}/{data_sel}\")\n", " y_s_d = data[1]\n", " y_d = data[0]\n", "\n", " # plot the chirp with noise\n", " line1[0].set_data(range(len(y_s_d)), y_s_d)\n", " plot1.set_xlim(0, len(y_s_d))\n", " plot1.set_ylim(min(y_s_d), max(y_s_d))\n", "\n", " # Plot the noise\n", " data = y_d\n", " line2[0].set_data(range(len(data)), data)\n", " plot2.set_xlim(0, len(data))\n", " plot2.set_ylim(min(data), max(data))\n", "\n", " #Plot the result\n", " start1 = time.time()\n", " data, coeffs_matrix = lms_fir(y_s_d, y_d, num_coeff, mu, \n", " #scale_bits=scale_bits, fix_point=fix_point\n", " adaption_step=adaption_step, \n", " )\n", " end1 = time.time()\n", " print(\"Elapsed (for fir_lms) = %s\" % round(end1 - start1, 3))\n", " line3[0].set_data(range(len(data)), data)\n", " plot3.set_xlim(0, len(data))\n", " plot3.set_ylim(np.min(data), np.max(data))\n", "\n", " #plot the coefficients progress\n", " plot4.clear()\n", " plot4.set_title(f\"mu={round(mu,4)}, N_coeffs={num_coeff}\")\n", " for i in range(coeffs_matrix.shape[1]):\n", " data = coeffs_matrix[:, i]\n", " plot4.plot(range(data.size), data, label=\"coeff {}\".format(i))\n", " plot4.set_xlim(0, data.size)\n", " plot4.autoscale(axis='y')\n", " plot4.legend(bbox_to_anchor=(1,1), loc=\"upper left\")\n", " fig.canvas.draw_idle()\n", " #plt.tight_layout()\n", " end0 = time.time()\n", " print(\"Elapsed (for update function) = %s\" % round(end0 - start0, 3))\n", "\n", "interact(update, data_sel=data_sel ,num_coeff=(0, 256, 2), mu=(0.001, 0.1, 0.001), \n", " #fix_point=False, scale_bits=(1, 31, 1), \n", " adaption_step=(1 , 128 ,1)\n", " )\n" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.13" } }, "nbformat": 4, "nbformat_minor": 2 }