1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.fft import fft, fftfreq
from scipy.signal import find_peaks
# ==========================================
# 【配置区域】请修改这里的信息
# ==========================================
# 1. 文件路径 (注意:Windows路径请用反斜杠 / 或 双斜杠 \\)
FILE_PATH = 'sensor_data.csv' # 举例: 'D:/data/test_01.xlsx'
# 2. 你要分析的那一列的表头名称 (Excel第一行的名字)
COLUMN_NAME = 'Voltage' # 举例: 'NOx_ppm' 或 'AI_01'
# 3. 采样率 (Hz): 每秒采集多少个点?
# 警告:这个数字错了,分析出来的频率全都是错的!
FS = 1000.0 # 举例: 100Hz, 1000Hz
# ==========================================
# 代码逻辑开始
# ==========================================
# 设置绘图风格和字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.style.use('bmh') # 使用一种比较专业的科研绘图风格
def load_and_analyze():
# --- 1. 读取数据 ---
try:
print(f"正在读取文件: {FILE_PATH} ...")
if FILE_PATH.endswith('.csv'):
df = pd.read_csv(FILE_PATH)
elif FILE_PATH.endswith(('.xls', '.xlsx')):
df = pd.read_excel(FILE_PATH)
else:
print("错误:只支持 .csv 或 .xlsx 格式文件")
return
# 检查列名是否存在
if COLUMN_NAME not in df.columns:
print(f"错误:在文件中找不到列名 '{COLUMN_NAME}'")
print(f"文件包含的列名有: {list(df.columns)}")
return
# 提取数据并转为 numpy 数组
# dropna() 是为了防止空值报错
data_raw = df[COLUMN_NAME].dropna().values
print(f"成功读取 {len(data_raw)} 个数据点")
except FileNotFoundError:
print("------------------------------------------------------")
print(f"【没找到文件】: {FILE_PATH}")
print("为了演示效果,我将自动生成一段带 50Hz 干扰的模拟数据...")
print("------------------------------------------------------")
# 生成模拟数据 (如果没找到文件)
N_sim = 2000
t_sim = np.linspace(0, N_sim/FS, N_sim)
# 真实信号(2Hz) + 工频干扰(50Hz) + 随机噪声
data_raw = 2.0 + 0.5*np.sin(2*np.pi*2*t_sim) + 0.2*np.sin(2*np.pi*50*t_sim) + np.random.normal(0, 0.1, N_sim)
# --- 2. 预处理 (去直流/去偏移) ---
# FFT 对直流分量(0Hz)非常敏感,如果不减去平均值,0Hz的柱子会通天高,
# 导致你看不清后面的干扰。
data_detrend = data_raw - np.mean(data_raw)
# --- 3. 执行 FFT ---
N = len(data_detrend)
yf = fft(data_detrend)
xf = fftfreq(N, 1 / FS)
# 取前半部分 (正频率)
x_plot = xf[:N//2]
y_plot = np.abs(yf[:N//2]) * 2 / N
# --- 4. 自动抓鬼 (寻找峰值) ---
# 寻找频谱中显著的尖峰 (高度超过最大值的 10%)
peaks, _ = find_peaks(y_plot, height=np.max(y_plot)*0.1)
print("\n【诊断报告】")
print("-" * 30)
print("发现潜在干扰源 (能量Top 5):")
# 对峰值按高度排序
peak_indices = peaks[np.argsort(y_plot[peaks])[::-1]][:5]
for i in peak_indices:
freq_val = x_plot[i]
amp_val = y_plot[i]
# 忽略接近 0Hz 的低频 (那是信号本身)
if freq_val > 1.0:
print(f"-> 频率: {freq_val:.1f} Hz (强度: {amp_val:.4f})")
if 48 < freq_val < 52:
print(" [警报] 疑似工频干扰 (电源/接地问题)")
elif 98 < freq_val < 102:
print(" [警报] 疑似整流纹波 (电源板滤波电容问题)")
# --- 5. 绘图 ---
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
# 图1: 时域 (原始波形)
# 为了防止数据量太大卡死,只画前 1000 个点预览
preview_N = min(1000, N)
time_axis = np.arange(preview_N) / FS
ax1.plot(time_axis, data_raw[:preview_N], 'k-', linewidth=1)
ax1.set_title(f"原始信号波形 (仅显示前 {preview_N} 点)", fontsize=14)
ax1.set_xlabel("时间 (s)")
ax1.set_ylabel("幅值")
# 图2: 频域 (FFT 结果)
ax2.plot(x_plot, y_plot, 'r-', linewidth=1)
# 标记出找到的干扰峰
ax2.plot(x_plot[peaks], y_plot[peaks], 'bx', label='检测到的峰值')
ax2.set_title("频谱分析 (FFT Spectrum) - 干扰侦查", fontsize=14)
ax2.set_xlabel("频率 (Hz)")
ax2.set_ylabel("干扰强度")
ax2.set_xlim(0, FS/2) # 只显示到奈奎斯特频率
ax2.legend()
plt.tight_layout()
plt.show()
if __name__ == "__main__":
load_and_analyze()
|