まずは蝋の翼から。

学んだことを書きながら確認・整理するためのメモブログ。こういうことなのかな?といったことをふわっと書いたりしていますが、理解が浅いゆえに的はずれなことも多々あると思うのでツッコミ歓迎

順序付き多値分類を回帰問題で解くときの閾値をoptunaで求める

目的

分類クラスが順序付きカテゴリの場合、分類問題としてではなく回帰問題として解く方法がある。

その際に、例えば2.4として予測されたラベルは2とするか3とするかを判別する閾値を最適化したい。

for文を回して0.01刻みで計算して・・・という愚直なやり方でもいいが今回はoptunaを使う。

モデルはlightGBMを用い、評価指標はF1-macroとする。

データ

挙動を確認するだけなので、House Priceの住宅価格MEDVを四捨五入して10で割ったものを順序付きラベルとする(12.9ならラベル1、25.5ならラベル2となる)。

import pandas as pd
import numpy as np
import lightgbm as lgb
import optuna
import sklearn
from sklearn.datasets import load_boston

boston_dataset = load_boston()

X = pd.DataFrame(boston_dataset["data"], columns=boston_dataset["feature_names"])
y = pd.DataFrame(np.round(boston_dataset["target"]/10,0), columns=["MEDV_per_10"]) # yを四捨五入
boston = pd.concat([X, y], axis=1)
boston = boston.query("MEDV_per_10 > 0") # ラベル0は少ないので落とす

# ラベル1以上でX,y作り直し
X = boston[boston_dataset["feature_names"]]
y = boston['MEDV_per_10']

y = y - 1.0 # 0スタートにする
train_x, test_x, train_y, test_y = train_test_split(X, y, stratify=y,random_state=0)

分類問題として解く

通常通り分類問題として解くと以下のコードとなる。

def main():

    num_labels = len(y.unique())
    param = {
        'objective': 'multiclass',
        'metric': 'multi_logloss',
        'num_class': len(y.unique()),
        'num_leavrs': 3,
        'learning_rate': 0.1
    }

    train_xy = lgb.Dataset(train_x, train_y)
    val_xy = lgb.Dataset(test_x, test_y, reference=train_xy)


    gbm = lgb.train(param, 
                    train_xy, 
                    valid_sets = val_xy,
                    verbose_eval=False)

    pred_proba = gbm.predict(test_x)
    pred_proba_reshape = pred_proba.reshape(num_labels, len(pred_proba)) # argmaxでlabelを取ってきやすい形にする
    pred_y = np.argmax(pred_proba_reshape, axis=0)  # 最尤と判断したクラスを選ぶ
    
    return sklearn.metrics.f1_score(test_y, 
                                    pred_y,
                                    average='macro',)

print('F1:', main())
# => F1: 0.17302174216620217

F1の結果は0.17。低すぎではという疑問はありつついったん次にいく。

回帰問題として解く

連続値の予測部分

いったん回帰モデルで予測した部分のみ抜粋する。

param = {
    'objective': 'regression',
    'boosting_type': 'gbdt',
    'num_leavrs': 3,
    'learning_rate': 0.1
}

train_xy = lgb.Dataset(train_x, train_y)
val_xy = lgb.Dataset(test_x, test_y, reference=train_xy)

gbm = lgb.train(param, 
                train_xy, 
                valid_sets = val_xy,
                verbose_eval=False)

# testの予測(連続値)
pred_cont_y = gbm.predict(test_x)

パターン1.Clipping+四捨五入でラベル化する

いったん、閾値をもとに予測された連続値をラベルに変換するのではなく単純にClipping+四捨五入でラベル変換をおこなう

pred_label_y = np.clip(pred_cont_y, min(y), max(y)).round() # clip+round
f1 =  sklearn.metrics.f1_score(test_y, 
                               pred_label_y,
                               average='macro',)

print('F1:', f1)
# => F1: 0.6679785299898898

このとき、F1: 0.6679785299898898となる。

パターン2. Optunaで閾値を探索してラベル化する

本題。OptunaでF1最大となる閾値を探索する。

なお、基本的な使い方は過去記事に書いた

knknkn.hatenablog.com

今回はラベルは0~4の5種類ある。そのため、0と1の閾値、1と2の閾値...といったように閾値を4つ作成する必要がある。

計算時に変数を用いる

今回は連続予測値yラベル予測値yに変換(ここを上手く最適化したい)した値をy実測値と照らしあわせてF1を計算し、これを最適化する。そのため、Optunaの目的関数としてこの連続予測値yy実測値変数が必要となる。
しかし、Optunaの目的関数には以下の記事

tech.515hikaru.net

Optunaはstudy.optimizeは関数オブジェクトを受け取っている。言い換えると objective は trial : optuna.Trial というただひとつの引数だけを受付ける関数であるという前提がある。

とあるように、trial : optuna.Trial以外の引数は受付けないため計算に変数を渡すことができない。

対応策1.高階関数

この対応としては1つは上記記事のように高階関数として記載する方法がある。

import warnings

def objective(true_y, pred_cont_y):
    def _objective(trial): # 引数 (trial) はTrial型の値
        warnings.simplefilter('ignore', category=RuntimeWarning)  # RuntimeWarningを無視扱いに設定
        
        num_labels = len(true_y.unique())
        labels = np.unique(true_y)

        thresholds = []

        for i in range(num_labels - 1): # 閾値数=label-1
            # 探索範囲: i=0ではラベル最小(0) ~ ラベル最大、それ以外ではi-1の閾値 ~ ラベル最大
            low = max(thresholds) if i > 0 else min(labels) # 下限:i-1の閾値最小(i=0のときはラベルの下限)
            high = max(labels) # 上限:ラベル最大

            # 閾値の最適候補を探索し、追加する
            t = trial.suggest_uniform(f't{i}', low, high)
            thresholds.append(t)


        pred_label_y = pd.cut(pred_cont_y,
                              [-np.inf] + thresholds + [np.inf], # [-inf, thresholds, inf]で区切られる
                              labels=labels # 区切りに対してラベルを対応させる
                             )

        return sklearn.metrics.f1_score(test_y, 
                                        pred_label_y,
                                        average='macro',)
    return _objective

この書き方でもいいが、functools.partialを用いて以下のように表現する方法もある。

def tmp_objective(true_y, pred_cont_y, trial):# 引数 (trial) はTrial型の値
    warnings.simplefilter('ignore', category=RuntimeWarning)  # RuntimeWarningを無視扱いに設定

    num_labels = len(true_y.unique())
    labels = np.unique(true_y)

    thresholds = []

    for i in range(num_labels - 1): # 閾値数=label-1
        # 探索範囲: i=0ではラベル最小(0) ~ ラベル最大、それ以外ではi-1の閾値 ~ ラベル最大
        low = max(thresholds) if i > 0 else min(labels) # 下限:i-1の閾値最小(i=0のときはラベルの下限)
        high = max(labels) # 上限:ラベル最大

        # 閾値の最適候補を探索し、追加する
        t = trial.suggest_uniform(f't{i}', low, high)
        thresholds.append(t)


    pred_label_y = pd.cut(pred_cont_y,
                          [-np.inf] + thresholds + [np.inf], # [-inf, thresholds, inf]で区切られる
                          labels=labels # 区切りに対してラベルを対応させる
                         )

    return sklearn.metrics.f1_score(test_y, 
                                    pred_label_y,
                                    average='macro',)

objective = partial(tmp_objective, test_y, pred_cont_y)

対応策2. Objective Class+call

他の対応としては、Classを作成し、__call__で最適化したい計算を返すようにすることでstudy.optimize時にこのClassのインスタンスを関数のように用いることができる。
そして、インスタンス変数として使用したい引数を渡す仕様にすると結果的に計算に変数を渡すことができる。

class OptunaRounder:

    def __init__(self, true_y, pred_cont_y):
        self.true_y = true_y
        self.pred_cont_y = pred_cont_y
        self.labels = np.unique(true_y)
        self.num_labels = len(self.labels)

    def __call__(self, trial):
        """最大化したい目的関数"""
        thresholds = []

        for i in range(self.num_labels - 1): # 閾値数=label-1
            # 探索範囲: i=0ではラベル最小(0) ~ ラベル最大、それ以外ではi-1の閾値 ~ ラベル最大
            low = max(thresholds) if i > 0 else min(self.labels) # 下限:i-1の閾値最小(i=0のときはラベルの下限)
            high = max(self.labels) # 上限:ラベル最大

            # 閾値の最適候補を探索し、追加する
            t = trial.suggest_uniform(f't{i}', low, high)
            thresholds.append(t)


        pred_label_y = pd.cut(self.pred_cont_y,
                              [-np.inf] + thresholds + [np.inf], # [-inf, thresholds, inf]で区切られる
                              labels=self.labels # 区切りに対してラベルを対応させる
                             )

        return sklearn.metrics.f1_score(self.true_y, 
                                        pred_label_y,
                                        average='macro',)

objective = OptunaRounder(test_y, pred_cont_y)

[参考]

blog.amedama.jp

最適化の実行

前述のなにかしらの方法で、閾値をもとにラベル変換してF1計算の結果が返ってくるobjective関数(インスタンス)を作成し最適化をおこなう。

study = optuna.create_study(direction="maximize") # 最適化処理を管理するstudyオブジェクト
study.optimize(objective, # 目的関数
               n_trials=30, # トライアル数
               timeout=60
              )

print(study.best_value)
# => 0.7358186890336309

F1は0.7358186890336309でClipping+四捨五入の0.6679785299898898より改善している。
なお、今回はtrialは30としているがもっと回すとより高い値になる可能性もある(ちなみに、ときどきtrial途中で詰まって終わらないことがあり、timeout設定をしても強制終了してくれないことがあってよくわからん。。。)

ちなみにこのときの閾値は以下

print(study.best_params)
{'t0': 0.6448355141969913,
 't1': 1.6126196134062987,
 't2': 2.67914828011351,
 't3': 2.9543708468573566}