Python脚本-本地CSV快速傅里叶变换

Python 代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.fft import fft, fftfreq
from scipy.signal import find_peaks

# ==========================================
# 【配置区域】请修改这里的信息
# ==========================================
# 1. 文件路径 (注意:Windows路径请用反斜杠 / 或 双斜杠 \\)
FILE_PATH = 'sensor_data.csv'  # 举例: 'D:/data/test_01.xlsx'

# 2. 你要分析的那一列的表头名称 (Excel第一行的名字)
COLUMN_NAME = 'Voltage'        # 举例: 'NOx_ppm' 或 'AI_01'

# 3. 采样率 (Hz): 每秒采集多少个点?
# 警告:这个数字错了,分析出来的频率全都是错的!
FS = 1000.0                    # 举例: 100Hz, 1000Hz

# ==========================================
# 代码逻辑开始
# ==========================================

# 设置绘图风格和字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.style.use('bmh') # 使用一种比较专业的科研绘图风格

def load_and_analyze():
    # --- 1. 读取数据 ---
    try:
        print(f"正在读取文件: {FILE_PATH} ...")
        if FILE_PATH.endswith('.csv'):
            df = pd.read_csv(FILE_PATH)
        elif FILE_PATH.endswith(('.xls', '.xlsx')):
            df = pd.read_excel(FILE_PATH)
        else:
            print("错误:只支持 .csv 或 .xlsx 格式文件")
            return
            
        # 检查列名是否存在
        if COLUMN_NAME not in df.columns:
            print(f"错误:在文件中找不到列名 '{COLUMN_NAME}'")
            print(f"文件包含的列名有: {list(df.columns)}")
            return
            
        # 提取数据并转为 numpy 数组
        # dropna() 是为了防止空值报错
        data_raw = df[COLUMN_NAME].dropna().values
        print(f"成功读取 {len(data_raw)} 个数据点")

    except FileNotFoundError:
        print("------------------------------------------------------")
        print(f"【没找到文件】: {FILE_PATH}")
        print("为了演示效果,我将自动生成一段带 50Hz 干扰的模拟数据...")
        print("------------------------------------------------------")
        # 生成模拟数据 (如果没找到文件)
        N_sim = 2000
        t_sim = np.linspace(0, N_sim/FS, N_sim)
        # 真实信号(2Hz) + 工频干扰(50Hz) + 随机噪声
        data_raw = 2.0 + 0.5*np.sin(2*np.pi*2*t_sim) + 0.2*np.sin(2*np.pi*50*t_sim) + np.random.normal(0, 0.1, N_sim)

    # --- 2. 预处理 (去直流/去偏移) ---
    # FFT 对直流分量(0Hz)非常敏感,如果不减去平均值,0Hz的柱子会通天高,
    # 导致你看不清后面的干扰。
    data_detrend = data_raw - np.mean(data_raw)

    # --- 3. 执行 FFT ---
    N = len(data_detrend)
    yf = fft(data_detrend)
    xf = fftfreq(N, 1 / FS)

    # 取前半部分 (正频率)
    x_plot = xf[:N//2]
    y_plot = np.abs(yf[:N//2]) * 2 / N

    # --- 4. 自动抓鬼 (寻找峰值) ---
    # 寻找频谱中显著的尖峰 (高度超过最大值的 10%)
    peaks, _ = find_peaks(y_plot, height=np.max(y_plot)*0.1)
    
    print("\n【诊断报告】")
    print("-" * 30)
    print("发现潜在干扰源 (能量Top 5):")
    # 对峰值按高度排序
    peak_indices = peaks[np.argsort(y_plot[peaks])[::-1]][:5] 
    
    for i in peak_indices:
        freq_val = x_plot[i]
        amp_val = y_plot[i]
        # 忽略接近 0Hz 的低频 (那是信号本身)
        if freq_val > 1.0: 
            print(f"-> 频率: {freq_val:.1f} Hz  (强度: {amp_val:.4f})")
            if 48 < freq_val < 52:
                print("   [警报] 疑似工频干扰 (电源/接地问题)")
            elif 98 < freq_val < 102:
                print("   [警报] 疑似整流纹波 (电源板滤波电容问题)")

    # --- 5. 绘图 ---
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))

    # 图1: 时域 (原始波形)
    # 为了防止数据量太大卡死,只画前 1000 个点预览
    preview_N = min(1000, N)
    time_axis = np.arange(preview_N) / FS
    ax1.plot(time_axis, data_raw[:preview_N], 'k-', linewidth=1)
    ax1.set_title(f"原始信号波形 (仅显示前 {preview_N} 点)", fontsize=14)
    ax1.set_xlabel("时间 (s)")
    ax1.set_ylabel("幅值")

    # 图2: 频域 (FFT 结果)
    ax2.plot(x_plot, y_plot, 'r-', linewidth=1)
    # 标记出找到的干扰峰
    ax2.plot(x_plot[peaks], y_plot[peaks], 'bx', label='检测到的峰值')
    
    ax2.set_title("频谱分析 (FFT Spectrum) - 干扰侦查", fontsize=14)
    ax2.set_xlabel("频率 (Hz)")
    ax2.set_ylabel("干扰强度")
    ax2.set_xlim(0, FS/2) # 只显示到奈奎斯特频率
    ax2.legend()

    plt.tight_layout()
    plt.show()

if __name__ == "__main__":
    load_and_analyze()

这段代码做了三件贴心的事:

  1. 自动识别格式: 支持 .csv.xlsx (Excel)。
  2. 自动去直流 (Detrend): 很多传感器数据有一个固定的电压底数(比如 2.5V),如果不去掉,0Hz 的能量会特别大,把干扰信号淹没。代码会自动把数据“拉平”到 0 基准线。
  3. 自动抓鬼 (Peak Detection): 不仅画图,还会自动打印出能量最大的前 3 个频率点(告诉你干扰到底是多少 Hz)。

你的准备工作

你需要知道三个信息:

  1. 文件路径: 你的数据文件放在哪?(例如 D:/data/sensor_log.csv)
  2. 列名: 哪一列是你要分析的数据?(例如 voltageconcentration)
  3. 采样率 (Fs): 当时是以什么速度采的?(例如 1 秒 1 个点就是 1Hz,1 秒 100 个点就是 100Hz)。这个必须准,否则算出来的频率是错的。

程序输出

1
2
3
4
5
6
7
8
9
正在读取文件: cems_fault_data.csv ...
成功读取 5000 个数据点

诊断报告
------------------------------
发现潜在干扰源 (能量Top 5):
-> 频率: 2.0 Hz  (强度: 0.4991)
-> 频率: 50.0 Hz  (强度: 0.1984)
   [警报] 疑似工频干扰 (电源/接地问题)

Licensed under CC BY-NC-SA 4.0