-
Notifications
You must be signed in to change notification settings - Fork 0
/
def_benchmark_ope_ONLY_PI.py
205 lines (171 loc) · 7.23 KB
/
def_benchmark_ope_ONLY_PI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
from logging import getLogger
from pathlib import Path
import time
import warnings
import hydra
import numpy as np
from omegaconf import DictConfig
from pandas import DataFrame
import pingouin as pg
from DeficientSupportInterpretableOPEEvaluator import DeficientSupportInterpretableOPEEvaluator
from sklearn.experimental import enable_hist_gradient_boosting
from sklearn.ensemble import HistGradientBoostingClassifier as LightGBM
from sklearn.ensemble import RandomForestClassifier as RandomForest
from sklearn.exceptions import ConvergenceWarning
from sklearn.linear_model import LogisticRegression
import json
from obp.dataset import OpenBanditDataset
from obp.ope import SelfNormalizedInverseProbabilityWeighting
from obp.policy import BernoulliTS
from obp.policy import Random
from OpenBanditDatasetSideInfo import OpenBanditDatasetSideInfo
from SideInformationEstimators.DeficientSupport.Clustering.IPSClustering import SelfNormInverseProbabilityWeightingClustering
from SideInformationEstimators.DeficientSupport.PseudoInverse.SideInformationPseudoInverse import \
SelfNormSideInformationPseudoInverse
logger = getLogger(__name__)
warnings.filterwarnings(action="ignore", category=ConvergenceWarning)
reg_model_dict = dict(
logistic_regression=LogisticRegression,
random_forest=RandomForest,
lightgbm=LightGBM,
)
@hydra.main(config_path="./my_benchmark/conf", config_name="config")
def main(cfg: DictConfig) -> None:
print(cfg)
logger.info(f"The current working directory is {Path().cwd()}")
start_time = time.time()
logger.info("initializing experimental condition..")
# compared ope estimators
# configurations
n_seeds = cfg.setting.n_seeds
sample_size = cfg.setting.sample_size
reg_model = cfg.setting.reg_model
campaign = cfg.setting.campaign
behavior_policy = cfg.setting.behavior_policy
test_size = cfg.setting.test_size
is_timeseries_split = cfg.setting.is_timeseries_split
n_folds = cfg.setting.n_folds
n_deficient_actions = cfg.setting.n_deficient_actions
obd_path = (
Path().cwd().parents[5] / "open_bandit_dataset"
if cfg.setting.is_full_obd
else None
)
random_state = cfg.setting.random_state
np.random.seed(random_state)
# define dataset
dataset_ts = OpenBanditDataset(
behavior_policy="bts", campaign=campaign, data_path=obd_path
)
dataset_ur = OpenBanditDatasetSideInfo(
behavior_policy="random", campaign=campaign, data_path=obd_path
)
# prepare logged bandit feedback and evaluation policies
if behavior_policy == "random":
if is_timeseries_split:
bandit_feedback_ur = dataset_ur.obtain_batch_bandit_feedback(
test_size=test_size,
is_timeseries_split=True,
)[0]
else:
bandit_feedback_ur = dataset_ur.obtain_batch_bandit_feedback()
bandit_feedbacks = [bandit_feedback_ur]
# obtain the ground-truth policy value
ground_truth_ts = OpenBanditDataset.calc_on_policy_policy_value_estimate(
behavior_policy="bts",
campaign=campaign,
data_path=obd_path,
test_size=test_size,
is_timeseries_split=is_timeseries_split,
)
# obtain action choice probabilities and define evaluation policies
policy_ts = BernoulliTS(
n_actions=dataset_ts.n_actions,
len_list=dataset_ts.len_list,
random_state=random_state,
is_zozotown_prior=True,
campaign=campaign,
)
action_dist_ts = policy_ts.compute_batch_action_dist(n_rounds=1000000)
evaluation_policies = [(ground_truth_ts, action_dist_ts)]
else:
if is_timeseries_split:
bandit_feedback_ts = dataset_ts.obtain_batch_bandit_feedback(
test_size=test_size,
is_timeseries_split=True,
)[0]
else:
bandit_feedback_ts = dataset_ts.obtain_batch_bandit_feedback()
bandit_feedbacks = [bandit_feedback_ts]
# obtain the ground-truth policy value
ground_truth_ur = OpenBanditDataset.calc_on_policy_policy_value_estimate(
behavior_policy="random",
campaign=campaign,
data_path=obd_path,
test_size=test_size,
is_timeseries_split=is_timeseries_split,
)
# obtain action choice probabilities and define evaluation policies
policy_ur = Random(
n_actions=dataset_ur.n_actions,
len_list=dataset_ur.len_list,
random_state=random_state,
)
action_dist_ur = policy_ur.compute_batch_action_dist(n_rounds=1000000)
evaluation_policies = [(ground_truth_ur, action_dist_ur)]
# regression models used in ope estimators
hyperparams = dict(cfg.reg_model_hyperparams)[reg_model]
regression_models = [reg_model_dict[reg_model](**hyperparams)]
sn_cl = SelfNormInverseProbabilityWeightingClustering(n_clusters=30,
action_context=dataset_ur.action_context[:, :-1])
sn_cl.estimator_name = 'SN Sim. (cluster)'
sn_pi = SelfNormSideInformationPseudoInverse()
sn_pi.estimator_name="SN PI"
ope_estimators = [
sn_pi,
]
# define an evaluator class
evaluator = DeficientSupportInterpretableOPEEvaluator(
random_states=np.arange(n_seeds),
bandit_feedbacks=bandit_feedbacks,
evaluation_policies=evaluation_policies,
ope_estimators=ope_estimators,
regression_models=regression_models,
n_deficient_actions=n_deficient_actions,
)
# conduct an evaluation of OPE experiment
logger.info("experiment started")
_ = evaluator.estimate_policy_value(sample_size=sample_size, n_folds_=n_folds)
# calculate statistics
root_mse = evaluator.calculate_mean(root=True)
mean_scaled = evaluator.calculate_mean(scale=True, root=True)
# save results of the evaluation of off-policy estimators
log_path = Path("./outputs")
log_path.mkdir(exist_ok=True, parents=True)
# save root mse
root_mse_df = DataFrame()
root_mse_df["estimator"] = list(root_mse.keys())
root_mse_df["mean"] = list(root_mse.values())
root_mse_df["mean(scaled)"] = list(mean_scaled.values())
root_mse_df.to_csv(log_path / "root_mse.csv")
# save mse
mse = evaluator.calculate_mean(root=False)
mse_df = DataFrame()
mse_df["estimator"] = list(mse.keys())
mse_df["mse"] = list(mse.values())
mse_df.to_csv(log_path / "mse.csv")
se_df = DataFrame(evaluator.calculate_squared_error())
se_df = DataFrame(se_df.stack()).reset_index(1)
se_df.rename(columns={"level_1": "estimators", 0: "se"}, inplace=True)
evaluator.visualize_cdf_aggregate(fig_dir="figures", fig_name="cdf.png")
res_dict = evaluator.squared_error
for key in res_dict.keys():
if isinstance(res_dict[key], np.ndarray):
res_dict[key] = res_dict[key].tolist()
with open(log_path / "res_dict_ONLY_PI.json", "w") as fp:
json.dump(res_dict, fp)
experiment = f"{campaign}-{behavior_policy}-{sample_size}"
elapsed_time = np.round((time.time() - start_time) / 60, 2)
logger.info(f"finish experiment {experiment} in {elapsed_time}min")
if __name__ == "__main__":
main()