카테고리 없음
실전프로젝트 - 구간분할 및 파생변수 추가에 대한 검정
iron-min
2025. 12. 15. 21:51
1. 구간분할 및 파생변수 추가
전처리를 진행하면서

이런식으로 양품과 불량에 대한 차이가 없어 추가적인 데이터를 확보할 필요성을 느꼈었습니다.
그래서 이런식으로

구간분할하여 총 100개의 데이터를 300개로 늘렸고
시계열 패턴이 있는 데이터의 특성상

이런식으로 패턴을 정량화 하기 위해 추가적인 변수를 늘리게 되었습니다.
2. 구간 및 파생변수 차이에 대한 검정 진행
추가적인 데이터 확보를 위해 한 노력들이 정말 의미가 있었는지 통계적 검정을 할 필요가 있었습니다.
그래서 어떤식으로 검정을 진행할건지 생각을 많이 했으며 조원들과 토의결과
1. 구간분할 3개 구간
2. 각 구간에 대한 양품 특성치 / 불량 특성치 (군을 비교하는거임) - 3
각 구간에 대한 양품 특성치 이동평균 / 불량 특성치 이동평균 - 3
각 구간에 대한 이동표준편차 - 3
각 구간에 대한 온도변화량 - 3
위와 같이 검정을 진행하는걸로 하였습니다.
어떻게 보면 당연할지도 모르지만 구간분할을 어떻게 할건지 시퀀스는 어떻게 처리할 것인지 등에대해 고민하여 상당한 시간이 걸렸습니다.
코드작성)
기존 작성했던 코드와 분류가 달라서 거의 새로 변수를 만들고 처음부터 다시 작성해야 했습니다.
# 시계열 엔지니어링
mil_test['ampere_lag1'] = mil_test.groupby('sequence_index')['ampere'].shift(1)
mil_test['volt_lag1'] = mil_test.groupby('sequence_index')['volt'].shift(1)
mil_test['temperature_lag1'] = mil_test.groupby('sequence_index')['temperature'].shift(1)
# 이동 평균/표준 편차도 sequence_index 내부에서만 계산
mil_test['전류이동평균'] = mil_test.groupby('sequence_index')['ampere'].rolling(window=60).mean().reset_index(level=0, drop=True)
mil_test['전압이동평균'] = mil_test.groupby('sequence_index')['volt'].rolling(window=60).mean().reset_index(level=0, drop=True)
mil_test['온도이동평균'] = mil_test.groupby('sequence_index')['temperature'].rolling(window=60).mean().reset_index(level=0, drop=True)
mil_test['전류이동표준편차'] = mil_test.groupby('sequence_index')['ampere'].rolling(window=120).std().reset_index(level=0, drop=True)
mil_test['전압이동표준편차'] = mil_test.groupby('sequence_index')['volt'].rolling(window=120).std().reset_index(level=0, drop=True)
mil_test['온도이동표준편차'] = mil_test.groupby('sequence_index')['temperature'].rolling(window=120).std().reset_index(level=0, drop=True)
mil_test['△전류']=mil_test['ampere'].diff()
mil_test['△전압']=mil_test['volt'].diff()
mil_test['△온도']=mil_test['temperature'].diff()
# 파생변수 엔지니어링
# 시퀀스별 피막 생성시간 변수
mil_test = mil_test.sort_values(['sequence_index', 'pk_datetime']).copy()
mil_test['pk_datetime'] = pd.to_datetime(mil_test['pk_datetime'])
# 시퀀스별 생성시간(각 행에 동일 값으로 붙음)
mil_test['생성시간'] = (
mil_test.groupby('sequence_index')['pk_datetime']
.transform(lambda x: x.max() - x.min())
)
mil_test['시간변화량(초)'] = mil_test['생성시간'].dt.total_seconds()
mil_test['두께변화량'] = mil_test['ampere'] * mil_test['시간변화량(초)']
mil_test['최종두께'] = mil_test.groupby('sequence_index')['두께변화량'].transform('sum')
mil_test = mil_test.sort_values(['sequence_index', 'pk_datetime']).copy()
mil_test['pk_datetime'] = pd.to_datetime(mil_test['pk_datetime'])
# 시퀀스별 경과시간 비율(0~1)
t0 = mil_test.groupby('sequence_index')['pk_datetime'].transform('min')
t1 = mil_test.groupby('sequence_index')['pk_datetime'].transform('max')
elapsed = (mil_test['pk_datetime'] - t0).dt.total_seconds()
total = (t1 - t0).dt.total_seconds()
ratio = np.where(total > 0, elapsed / total, 0.0) # 길이가 0인 시퀀스 예외처리
# 3등분: [0~1/3)=0, [1/3~2/3)=1, [2/3~1]=2
mil_test['tertile'] = pd.cut(
ratio,
bins=[-np.inf, 1/3, 2/3, np.inf],
labels=[0, 1, 2]
).astype(int)
cols = [
'volt','ampere','temperature',
'전류이동평균','전압이동평균','온도이동평균',
'전류이동표준편차','전압이동표준편차','온도이동표준편차',
'△전류','△전압','△온도'
]
def mw_by_tertile(df, cols, label_col='failure', tertile_col='tertile',
good_label=1, bad_label=-1):
rows = []
for t in sorted(df[tertile_col].dropna().unique()):
df_t = df[df[tertile_col] == t]
for c in cols:
# 결측 제거
g = df_t.loc[df_t[label_col] == good_label, c].dropna()
b = df_t.loc[df_t[label_col] == bad_label, c].dropna()
n_g, n_b = len(g), len(b)
# 한쪽 표본이 없거나 너무 작으면 스킵
if n_g < 1 or n_b < 1:
rows.append({
'tertile': t, 'column': c,
'n_good': n_g, 'n_bad': n_b,
'U': np.nan, 'p': np.nan,
'effect_rank_biserial': np.nan,
'note': 'sample 부족'
})
continue
# Mann–Whitney U (양측검정)
u, p = mannwhitneyu(g, b, alternative='two-sided')
# 효과크기: rank-biserial correlation ([-1,1])
# rbc = 1 - 2U/(n1*n2) (여기서는 good vs bad 기준)
rbc = 1 - (2 * u) / (n_g * n_b)
rows.append({
'tertile': t, 'column': c,
'n_good': n_g, 'n_bad': n_b,
'U': u, 'p': round(p,4),
'effect_rank_biserial': rbc,
'note': ''
})
out = pd.DataFrame(rows)
# (선택) tertile 내에서 12개 변수에 대한 다중검정 보정(FDR-BH)
# statsmodels 없으면 이 블록은 주석처리하세요.
try:
from statsmodels.stats.multitest import multipletests
out['p_adj_fdr'] = np.nan
for t in out['tertile'].dropna().unique():
mask = (out['tertile'] == t) & out['p'].notna()
if mask.sum() > 0:
out.loc[mask, 'p_adj_fdr'] = multipletests(out.loc[mask, 'p'], method='fdr_bh')[1]
except Exception:
out['p_adj_fdr'] = np.nan
return out.sort_values(['tertile', 'p'])
result = mw_by_tertile(mil_test, cols)
result
결과

ppt로 깔끔하게 재작성
