順序付き多値分類を回帰問題で解くときの閾値をoptunaで求める
目的
分類クラスが順序付きカテゴリの場合、分類問題としてではなく回帰問題として解く方法がある。
その際に、例えば2.4として予測されたラベルは2とするか3とするかを判別する閾値を最適化したい。
for文を回して0.01刻みで計算して・・・という愚直なやり方でもいいが今回はoptuna
を使う。
モデルはlightGBMを用い、評価指標はF1-macroとする。
データ
挙動を確認するだけなので、House Priceの住宅価格MEDV
を四捨五入して10で割ったものを順序付きラベルとする(12.9ならラベル1、25.5ならラベル2となる)。
import pandas as pd import numpy as np import lightgbm as lgb import optuna import sklearn from sklearn.datasets import load_boston boston_dataset = load_boston() X = pd.DataFrame(boston_dataset["data"], columns=boston_dataset["feature_names"]) y = pd.DataFrame(np.round(boston_dataset["target"]/10,0), columns=["MEDV_per_10"]) # yを四捨五入 boston = pd.concat([X, y], axis=1) boston = boston.query("MEDV_per_10 > 0") # ラベル0は少ないので落とす # ラベル1以上でX,y作り直し X = boston[boston_dataset["feature_names"]] y = boston['MEDV_per_10'] y = y - 1.0 # 0スタートにする train_x, test_x, train_y, test_y = train_test_split(X, y, stratify=y,random_state=0)
分類問題として解く
通常通り分類問題として解くと以下のコードとなる。
def main(): num_labels = len(y.unique()) param = { 'objective': 'multiclass', 'metric': 'multi_logloss', 'num_class': len(y.unique()), 'num_leavrs': 3, 'learning_rate': 0.1 } train_xy = lgb.Dataset(train_x, train_y) val_xy = lgb.Dataset(test_x, test_y, reference=train_xy) gbm = lgb.train(param, train_xy, valid_sets = val_xy, verbose_eval=False) pred_proba = gbm.predict(test_x) pred_proba_reshape = pred_proba.reshape(num_labels, len(pred_proba)) # argmaxでlabelを取ってきやすい形にする pred_y = np.argmax(pred_proba_reshape, axis=0) # 最尤と判断したクラスを選ぶ return sklearn.metrics.f1_score(test_y, pred_y, average='macro',) print('F1:', main()) # => F1: 0.17302174216620217
F1の結果は0.17
。低すぎではという疑問はありつついったん次にいく。
回帰問題として解く
連続値の予測部分
いったん回帰モデルで予測した部分のみ抜粋する。
param = { 'objective': 'regression', 'boosting_type': 'gbdt', 'num_leavrs': 3, 'learning_rate': 0.1 } train_xy = lgb.Dataset(train_x, train_y) val_xy = lgb.Dataset(test_x, test_y, reference=train_xy) gbm = lgb.train(param, train_xy, valid_sets = val_xy, verbose_eval=False) # testの予測(連続値) pred_cont_y = gbm.predict(test_x)
パターン1.Clipping+四捨五入でラベル化する
いったん、閾値をもとに予測された連続値をラベルに変換するのではなく単純にClipping+四捨五入でラベル変換をおこなう
pred_label_y = np.clip(pred_cont_y, min(y), max(y)).round() # clip+round f1 = sklearn.metrics.f1_score(test_y, pred_label_y, average='macro',) print('F1:', f1) # => F1: 0.6679785299898898
このとき、F1: 0.6679785299898898
となる。
パターン2. Optunaで閾値を探索してラベル化する
本題。OptunaでF1最大となる閾値を探索する。
なお、基本的な使い方は過去記事に書いた
今回はラベルは0~4の5種類ある。そのため、0と1の閾値、1と2の閾値...といったように閾値を4つ作成する必要がある。
計算時に変数を用いる
今回は連続予測値y
をラベル予測値y
に変換(ここを上手く最適化したい)した値をy実測値
と照らしあわせてF1を計算し、これを最適化する。そのため、Optunaの目的関数としてこの連続予測値y
、y実測値
変数が必要となる。
しかし、Optunaの目的関数には以下の記事
Optunaは
study.optimize
は関数オブジェクトを受け取っている。言い換えると objective は trial : optuna.Trial というただひとつの引数だけを受付ける関数であるという前提がある。
とあるように、trial : optuna.Trial
以外の引数は受付けないため計算に変数を渡すことができない。
対応策1.高階関数
この対応としては1つは上記記事のように高階関数として記載する方法がある。
import warnings def objective(true_y, pred_cont_y): def _objective(trial): # 引数 (trial) はTrial型の値 warnings.simplefilter('ignore', category=RuntimeWarning) # RuntimeWarningを無視扱いに設定 num_labels = len(true_y.unique()) labels = np.unique(true_y) thresholds = [] for i in range(num_labels - 1): # 閾値数=label-1 # 探索範囲: i=0ではラベル最小(0) ~ ラベル最大、それ以外ではi-1の閾値 ~ ラベル最大 low = max(thresholds) if i > 0 else min(labels) # 下限:i-1の閾値最小(i=0のときはラベルの下限) high = max(labels) # 上限:ラベル最大 # 閾値の最適候補を探索し、追加する t = trial.suggest_uniform(f't{i}', low, high) thresholds.append(t) pred_label_y = pd.cut(pred_cont_y, [-np.inf] + thresholds + [np.inf], # [-inf, thresholds, inf]で区切られる labels=labels # 区切りに対してラベルを対応させる ) return sklearn.metrics.f1_score(test_y, pred_label_y, average='macro',) return _objective
この書き方でもいいが、functools.partial
を用いて以下のように表現する方法もある。
def tmp_objective(true_y, pred_cont_y, trial):# 引数 (trial) はTrial型の値 warnings.simplefilter('ignore', category=RuntimeWarning) # RuntimeWarningを無視扱いに設定 num_labels = len(true_y.unique()) labels = np.unique(true_y) thresholds = [] for i in range(num_labels - 1): # 閾値数=label-1 # 探索範囲: i=0ではラベル最小(0) ~ ラベル最大、それ以外ではi-1の閾値 ~ ラベル最大 low = max(thresholds) if i > 0 else min(labels) # 下限:i-1の閾値最小(i=0のときはラベルの下限) high = max(labels) # 上限:ラベル最大 # 閾値の最適候補を探索し、追加する t = trial.suggest_uniform(f't{i}', low, high) thresholds.append(t) pred_label_y = pd.cut(pred_cont_y, [-np.inf] + thresholds + [np.inf], # [-inf, thresholds, inf]で区切られる labels=labels # 区切りに対してラベルを対応させる ) return sklearn.metrics.f1_score(test_y, pred_label_y, average='macro',) objective = partial(tmp_objective, test_y, pred_cont_y)
対応策2. Objective Class+call
他の対応としては、Classを作成し、__call__
で最適化したい計算を返すようにすることでstudy.optimize
時にこのClassのインスタンスを関数のように用いることができる。
そして、インスタンス変数として使用したい引数を渡す仕様にすると結果的に計算に変数を渡すことができる。
class OptunaRounder: def __init__(self, true_y, pred_cont_y): self.true_y = true_y self.pred_cont_y = pred_cont_y self.labels = np.unique(true_y) self.num_labels = len(self.labels) def __call__(self, trial): """最大化したい目的関数""" thresholds = [] for i in range(self.num_labels - 1): # 閾値数=label-1 # 探索範囲: i=0ではラベル最小(0) ~ ラベル最大、それ以外ではi-1の閾値 ~ ラベル最大 low = max(thresholds) if i > 0 else min(self.labels) # 下限:i-1の閾値最小(i=0のときはラベルの下限) high = max(self.labels) # 上限:ラベル最大 # 閾値の最適候補を探索し、追加する t = trial.suggest_uniform(f't{i}', low, high) thresholds.append(t) pred_label_y = pd.cut(self.pred_cont_y, [-np.inf] + thresholds + [np.inf], # [-inf, thresholds, inf]で区切られる labels=self.labels # 区切りに対してラベルを対応させる ) return sklearn.metrics.f1_score(self.true_y, pred_label_y, average='macro',) objective = OptunaRounder(test_y, pred_cont_y)
[参考]
最適化の実行
前述のなにかしらの方法で、閾値をもとにラベル変換してF1計算の結果が返ってくるobjective関数(インスタンス)を作成し最適化をおこなう。
study = optuna.create_study(direction="maximize") # 最適化処理を管理するstudyオブジェクト study.optimize(objective, # 目的関数 n_trials=30, # トライアル数 timeout=60 ) print(study.best_value) # => 0.7358186890336309
F1は0.7358186890336309
でClipping+四捨五入の0.6679785299898898
より改善している。
なお、今回はtrialは30としているがもっと回すとより高い値になる可能性もある(ちなみに、ときどきtrial途中で詰まって終わらないことがあり、timeout
設定をしても強制終了してくれないことがあってよくわからん。。。)
ちなみにこのときの閾値は以下
print(study.best_params)
{'t0': 0.6448355141969913, 't1': 1.6126196134062987, 't2': 2.67914828011351, 't3': 2.9543708468573566}