카테고리 없음
실전프로젝트 3일차 - 가설검정 및 머신러닝 모델 테스트
iron-min
2025. 12. 4. 23:55
1. 추가 가설 검정
어제 가설검정을 한 결과 시퀀스별 평균낸 값들이 양품/불량 특성치로 나눠 검정했을때 전부 같다고 나와서 문제가 있었습니다.
그런데 튜터님께서 해당경우는 표본이 너무 적어서 그럴수있다고 하셔서(불량 11개, 양품89개) 그냥 시퀀스로 나누지 말고 불량 양품으로만 전부 비교해보라고 하셨습니다.
1-1. 전체 시퀀스 특성치 비교
mw_sq_total = []
for col in ['volt', 'ampere', 'temperature', '시간변화량(초)','두께변화량']:
stat, p = mannwhitneyu(df_success[col],
df_fail[col],
alternative='two-sided')
mw_sq_total.append({
'Column': col,
'U-statistic': round(stat, 4),
'p-value': round(p, 4),
'유의성 (p < 0.05)': '차이가 있다' if p < 0.05 else '차이가 없다'
})
mw_sq_total_result = pd.DataFrame(mw_sq_total)
print(mw_sq_total_result)

1-2. 전체 시퀀스 등분산성 검정
from scipy.stats import levene
levene_results = []
for col in ['volt', 'ampere', 'temperature', '시간변화량(초)','두께변화량']:
stat, p = levene(df_success[col], df_fail[col])
levene_results.append({
'Column': col,
'Levene_statistic': round(stat, 4),
'p-value': round(p, 4),
'등분산을 만족하는가 (p ≥ 0.05)': '등분산이다.' if p >= 0.05 else '아니다'
})
levene_df = pd.DataFrame(levene_results)
print(levene_df)

1-3. 전체 시퀀스 변화율 비교
df_fail = df_mil[df_mil['failure'] == -1]
df_success = df_mil[df_mil['failure'] == 1]
mw_delta_total = []
for col in ['dI_dt_ampere', 'dV_dt_volt', 'dT_dt_temperature','dThickness_dt']:
stat, p = mannwhitneyu(df_success[col].dropna(),
df_fail[col].dropna(),
alternative='two-sided')
mw_delta_total.append({
'Column': col,
'U-statistic': round(stat, 4),
'p-value': round(p, 4),
'유의성 (p < 0.05)': '차이가 있다' if p < 0.05 else '차이가 없다'
})
mw_delta_total_result = pd.DataFrame(mw_delta_total)
print(mw_delta_total_result)

1-4. 전체 시퀀스 산포 비교
전체 시퀀스 산포도 Levene 검정으로 검정했는데 코드가 날아가버렸습니다.
결과는 위 4개 변화율 요인에 대해서 모두 차이가 없는걸로 나타났습니다.
1-5. 결론
시퀀스별 평균을 낸뒤에 평균,산포,변화율 비교

군내산포를 고려하지 않고 군간만 비교했을때 양품과 불량품의 차이가 없다는 것을 알 수 있음.
즉, 양품과 불량품의 시퀀스별 차이는 별로 나지 않는다. 둘다 특정 패턴을 따르고 있다는것으로 추측
시퀀스별 평균을 낸뒤에 평균,산포,변화율 비교

군내+군간 산포를 고려했을때 양품과 불량품의 차이가 있음.
즉, 양품과 불량품의 군내 특성치 및 산포가 다름.
결론
시퀀스 내의 패턴/변동성을 가지고 양품과 불량품을 구별해야함.
2. 머신러닝 적용
2-1. 머신러닝 준비단계
features_to_use = ['failure','volt', 'ampere', 'temperature', '시간변화량(초)',
'두께변화량','dI_dt_ampere','dV_dt_volt','dT_dt_temperature','dThickness_dt']
df_mil_avg = df_mil.groupby('sequence_index').mean()
clean_df_avg = df_mil_avg[features_to_use]
clean_df_avg

from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import StratifiedKFold, cross_validate
from sklearn.metrics import make_scorer, precision_score, recall_score, f1_score
# X (독립변수)와 y (종속변수) 분리
X = clean_df_avg.drop('failure', axis=1)
y = clean_df_avg['failure']
# 훈련/테스트 데이터 분할
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=0.2, # 20%를 테스트용으로
random_state=42, # 재현 가능한 결과
stratify=y # 클래스 비율 유지
)
2-2. Decision Tree
pipe = Pipeline([
('smote', SMOTE(k_neighbors=2, random_state=42)),
('clf', DecisionTreeClassifier(
criterion='gini',
max_depth=8,
min_samples_split=10,
min_samples_leaf=10,
random_state=42
))
])
cv = StratifiedKFold(
n_splits=5, # 5-fold
shuffle=True,
random_state=42
)
scoring = {
'precision': make_scorer(precision_score),
'recall': make_scorer(recall_score),
'f1': make_scorer(f1_score),
'accuracy': 'accuracy'
}
cv_results = cross_validate(
pipe,
X, y, # 전체 데이터에 대해 CV
cv=cv,
scoring=scoring,
return_train_score=False,
n_jobs=-1 # CPU 여러 코어 사용 (옵션)
)
# 폴드별 결과 + 평균 출력
for metric in scoring.keys():
scores = cv_results[f'test_{metric}']
print(f"{metric} 각 fold 점수: {scores}")
print(f"{metric} 평균: {np.mean(scores):.4f}")
print("-" * 40)

from sklearn.metrics import classification_report, confusion_matrix
# 1) train 데이터로 파이프라인 학습 (여기서 내부적으로 train에만 SMOTE 적용됨)
pipe.fit(X_train, y_train)
# 2) test 데이터 예측
y_pred = pipe.predict(X_test)
# 3) 성능 평가
print(classification_report(y_test, y_pred))
print("Confusion Matrix:")
print(confusion_matrix(y_test, y_pred))

2-3. RandomFroest
from sklearn.ensemble import RandomForestClassifier
rf_pipe = Pipeline([
('smote', SMOTE(k_neighbors=2, random_state=42)),
('clf', RandomForestClassifier(
n_estimators=200, # 트리 개수
max_depth=None, # 깊이 제한 없음 (필요하면 숫자로 제한 가능)
min_samples_split=10, # 노드 분할 최소 샘플 수
min_samples_leaf=5, # 리프 노드 최소 샘플 수
n_jobs=-1, # 코어 모두 사용
random_state=42
))
])
# Stratified K-Fold 설정 (DT 때 썼던 그대로)
cv = StratifiedKFold(
n_splits=5,
shuffle=True,
random_state=42
)
# scoring도 DT 때랑 동일하게
scoring = {
'precision': make_scorer(precision_score),
'recall': make_scorer(recall_score),
'f1': make_scorer(f1_score),
'accuracy': 'accuracy'
}
# 교차검증 실행
rf_cv_results = cross_validate(
rf_pipe,
X, y,
cv=cv,
scoring=scoring,
return_train_score=False,
n_jobs=-1
)
# 결과 출력
for metric in scoring.keys():
scores = rf_cv_results[f'test_{metric}']
print(f"{metric} 각 fold 점수: {scores}")
print(f"{metric} 평균: {np.mean(scores):.4f}")
print("-" * 40)

# 1) train 데이터로 RF 파이프라인 학습
rf_pipe.fit(X_train, y_train)
# 2) test 데이터 예측
y_pred_rf = rf_pipe.predict(X_test)
# 3) 성능 평가
print(classification_report(y_test, y_pred_rf))
print("Confusion Matrix:")
print(confusion_matrix(y_test, y_pred_rf))

2-4. XGBOOST
from xgboost import XGBClassifier
#XGBOOST 용 이진 레이블
y_xgb = (y == 1.0).astype(int)
xgb_pipe = Pipeline([
('smote', SMOTE(k_neighbors=2, random_state=42)),
('clf', XGBClassifier(
n_estimators=300,
max_depth=5,
learning_rate=0.1,
subsample=0.8,
colsample_bytree=0.8,
reg_lambda=1.0,
objective='binary:logistic',
eval_metric='logloss', # 경고 제거용
n_jobs=-1,
random_state=42
))
])
cv = StratifiedKFold(
n_splits=5,
shuffle=True,
random_state=42
)
scoring = {
'precision': make_scorer(precision_score), # pos_label=1 (기본값)
'recall': make_scorer(recall_score),
'f1': make_scorer(f1_score),
'accuracy': 'accuracy'
}
xgb_cv_results = cross_validate(
xgb_pipe,
X, y_xgb, # 👈 여기서 y_xgb 사용
cv=cv,
scoring=scoring,
return_train_score=False,
n_jobs=-1
)
for metric in scoring.keys():
scores = xgb_cv_results[f'test_{metric}']
print(f"{metric} 각 fold 점수: {scores}")
print(f"{metric} 평균: {np.mean(scores):.4f}")
print("-" * 40)

X_train, X_test, y_train_raw, y_test_raw = train_test_split(
X, y,
test_size=0.2,
random_state=42,
stratify=y
)
# XGBoost용 0/1 레이블
y_train = (y_train_raw == 1.0).astype(int)
y_test = (y_test_raw == 1.0).astype(int)
# 1) train 데이터로 XGBoost 파이프라인 학습
xgb_pipe.fit(X_train, y_train)
# 2) test 데이터 예측
y_pred_xgb = xgb_pipe.predict(X_test)
# 3) 성능 평가
print(classification_report(y_test, y_pred_xgb))
print("Confusion Matrix:")
print(confusion_matrix(y_test, y_pred_xgb))

2-5. LIGHT GBM
from lightgbm import LGBMClassifier
y_lgb = (y == 1.0).astype(int)
lgb_pipe = Pipeline([
('smote', SMOTE(k_neighbors=2, random_state=42)),
('clf', LGBMClassifier(
n_estimators=300,
learning_rate=0.05,
num_leaves=31,
max_depth=5, # 제한 없음 (필요하면 숫자로 제한)
subsample=0.8, # bagging_fraction
colsample_bytree=0.8, # feature_fraction
reg_lambda=1.0,
objective='binary',
n_jobs=-1,
random_state=42
))
])
cv = StratifiedKFold(
n_splits=5,
shuffle=True,
random_state=42
)
scoring = {
'precision': make_scorer(precision_score),
'recall': make_scorer(recall_score),
'f1': make_scorer(f1_score),
'accuracy': 'accuracy'
}
lgb_cv_results = cross_validate(
lgb_pipe,
X, y_lgb,
cv=cv,
scoring=scoring,
return_train_score=False,
n_jobs=-1
)
for metric in scoring.keys():
scores = lgb_cv_results[f'test_{metric}']
print(f"{metric} 각 fold 점수: {scores}")
print(f"{metric} 평균: {np.mean(scores):.4f}")
print("-" * 40)

# LightGBM용 0/1 레이블
y_train = (y_train_raw == 1.0).astype(int)
y_test = (y_test_raw == 1.0).astype(int)
# 1) train 데이터로 LGBM 파이프라인 학습
lgb_pipe.fit(X_train, y_train)
# 2) test 데이터 예측
y_pred_lgb = lgb_pipe.predict(X_test)
# 3) 성능 평가
print(classification_report(y_test, y_pred_lgb))
print("Confusion Matrix:")
print(confusion_matrix(y_test, y_pred_lgb))

2-6. 결론
대체로 TREE 모델보다 부스팅 모델이 결과가 잘나온것을 볼 수 있습니다.
내일 모델 시각화를 해볼 예정입니다.