You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
313 lines
13 KiB
313 lines
13 KiB
import pandas as pd
|
|
import numpy as np
|
|
from sklearn.preprocessing import StandardScaler, LabelEncoder
|
|
from tqdm import tqdm
|
|
from pandarallel import pandarallel
|
|
from sklearn.model_selection import train_test_split
|
|
# import random
|
|
import gc
|
|
import ast
|
|
import os
|
|
import warnings
|
|
import joblib
|
|
|
|
|
|
warnings.filterwarnings('ignore')
|
|
pd.options.mode.chained_assignment = None
|
|
pandarallel.initialize()
|
|
|
|
|
|
def pandas_list_to_array(df):
|
|
"""
|
|
Input: DataFrame of shape (x, y), containing list of length l
|
|
Return: np.array of shape (x, l, y)
|
|
"""
|
|
|
|
return np.transpose(
|
|
np.array(df.values.tolist()),
|
|
(0, 2, 1)
|
|
)
|
|
|
|
|
|
def preprocess_inputs(df, cols: list):
|
|
return pandas_list_to_array(
|
|
df[cols]
|
|
)
|
|
|
|
|
|
def append_all_data(files_list, file_head_path):
|
|
"""
|
|
concat all the data
|
|
:param files_list: the name of data
|
|
:param file_head_path: the path of data
|
|
:return: DataFrame of data for all
|
|
"""
|
|
data_all_path = file_head_path + files_list[0]
|
|
data_all = pd.read_csv(data_all_path)
|
|
data_all = data_all.head(0)
|
|
try:
|
|
del data_all['Unnamed: 0']
|
|
except KeyError as e:
|
|
pass
|
|
# 循环添加全部数据
|
|
for i in files_list:
|
|
data_path = file_head_path + i
|
|
print("当前文件为:", data_path)
|
|
data = pd.read_csv(data_path)
|
|
try:
|
|
del data['Unnamed: 0']
|
|
except KeyError as e:
|
|
pass
|
|
data_all = data_all.append(data)
|
|
return data_all
|
|
|
|
|
|
def file_name(file_dir):
|
|
files_list = []
|
|
for root, dirs, files in os.walk(file_dir):
|
|
# print("success")
|
|
for name in files:
|
|
files_list.append(name)
|
|
return files_list
|
|
|
|
|
|
def load_data(making_data_dir, link_data_dir, cross_data_dir, head_link_dir,
|
|
win_order_data_dir, pre_arrival_sqe_dir, data_for_driver_xw, downstream_status_dir):
|
|
"""
|
|
loading three path of data, then merge them
|
|
:return: all data by order_level
|
|
"""
|
|
print('-------------LOAD DATA for mk_data----------------')
|
|
mk_list = file_name(making_data_dir)
|
|
mk_list.sort()
|
|
mk_data = append_all_data(mk_list, making_data_dir)
|
|
#mk_data = pd.read_csv('/home/didi2021/didi2021/giscup_2021/final_train_data_0703/max_order_xt/join_20200825.csv') # for test running
|
|
mk_data['date_time'] = mk_data['date_time'].astype(str)
|
|
mk_data['dayofweek'] = pd.to_datetime(mk_data['date_time'])
|
|
mk_data['dayofweek'] = mk_data['dayofweek'].dt.dayofweek+1
|
|
|
|
weather_le = LabelEncoder()
|
|
mk_data['weather_le'] = weather_le.fit_transform(mk_data['weather'])
|
|
mk_data['driver_id'] = mk_data['driver_id'].astype(str)
|
|
|
|
"""
|
|
print('-------------LOAD DATA for driver_data----------------')
|
|
driver_list = file_name(data_for_driver_xw)
|
|
driver_list.sort()
|
|
driver_data = append_all_data(driver_list, data_for_driver_xw)
|
|
#driver_data = pd.read_csv('/home/didi2021/didi2021/giscup_2021/final_train_data_0703/data_for_driver_xw/driver_20200825_head.txt')
|
|
driver_data = driver_data[['driver_id','date_time','entropy','hour_mean','workday_order','weekend_order']]
|
|
driver_data['date_time'] = driver_data['date_time'].astype(str)
|
|
driver_data['driver_id'] = driver_data['driver_id'].astype(str)
|
|
mk_data = mk_data.merge(driver_data, on=['driver_id', 'date_time'], how='left')
|
|
del driver_data
|
|
"""
|
|
|
|
"""
|
|
print('-------------LOAD DATA for downstream_status_for_order----------------')
|
|
ds_data_list = file_name(downstream_status_dir)
|
|
ds_data_list.sort()
|
|
ds_link_data = append_all_data(ds_data_list, downstream_status_dir)
|
|
#ds_link_data = pd.read_csv('/home/didi2021/didi2021/giscup_2021/final_train_data_0703/downstream_status_for_order/ds_for_order_20200825.csv')
|
|
mk_data = mk_data.merge(ds_link_data, on=['order_id'], how='left')
|
|
del ds_link_data
|
|
"""
|
|
|
|
|
|
"""
|
|
print('-------------LOAD DATA for rate_status_for_order----------------')
|
|
#rate_data_list = file_name(rate_status_for_order)
|
|
#rate_data_list.sort()
|
|
#rate_data = append_all_data(rate_data_list, rate_status_for_order)
|
|
rate_data = pd.read_csv('/home/didi2021/didi2021/giscup_2021/final_train_data_0703/rate_status_for_order/rate_for_order_20200825.csv')
|
|
mk_data = mk_data.merge(rate_data, on=['order_id'], how='left')
|
|
del rate_data
|
|
"""
|
|
|
|
|
|
print('Remove the wk2_ and m1_ and ratio')
|
|
del_cols = []
|
|
mk_cols = mk_data.columns.tolist()
|
|
for i in range(len(mk_cols)):
|
|
if 'wk2_' in mk_cols[i]:
|
|
del_cols.append(mk_cols[i])
|
|
if 'm1_' in mk_cols[i]:
|
|
del_cols.append(mk_cols[i])
|
|
if 'ratio' in mk_cols[i]:
|
|
del_cols.append(mk_cols[i])
|
|
del_cols = del_cols + ['date_time_mean','weather', 'driver_id', 'date_time_dt', 'link_time_sum','date_time_sum']
|
|
print('*-' * 40, 'Will be drop the list:', del_cols)
|
|
mk_data.drop(columns=del_cols, axis=1, inplace=True)
|
|
print('The init shape of mk_data:', mk_data.shape)
|
|
|
|
|
|
print('-------------LOAD WIN DATA----------------')
|
|
win_order_list = file_name(win_order_data_dir)
|
|
win_order_list.sort()
|
|
win_order_data = append_all_data(win_order_list, win_order_data_dir)
|
|
#win_order_data = pd.read_csv('/home/didi2021/didi2021/giscup_2021/final_train_data_0703/win_order_xw/win_for_slice_20200825.csv') # for test running
|
|
del_win_order_cols = []
|
|
win_order_cols = win_order_data.columns.tolist()
|
|
for i in range(len(win_order_cols)):
|
|
if 'last_wk_lk_current' in win_order_cols[i]:
|
|
del_win_order_cols.append(win_order_cols[i])
|
|
#if 'distance' in win_order_cols[i]:
|
|
# del_win_order_cols.append(win_order_cols[i])
|
|
#if '1_percent' in win_order_cols[i]:
|
|
# del_win_order_cols.append(win_order_cols[i])
|
|
#if '0_percent' in win_order_cols[i]:
|
|
# del_win_order_cols.append(win_order_cols[i])
|
|
del_win_order_cols = del_win_order_cols + ['slice_id', 'date_time']
|
|
win_order_data.drop(columns=del_win_order_cols, axis=1, inplace=True)
|
|
print('win_order_data.shape',win_order_data.shape)
|
|
mk_data = pd.merge(mk_data, win_order_data, how='left', on='order_id')
|
|
print('mk_data.shape',mk_data.shape)
|
|
del win_order_data
|
|
gc.collect()
|
|
|
|
|
|
print('-------------LOAD HEAD DATA----------------')
|
|
head_list = file_name(head_link_dir)
|
|
head_list.sort()
|
|
head_data = append_all_data(head_list, head_link_dir)
|
|
#head_data = pd.read_csv('/home/didi2021/didi2021/giscup_2021/final_train_data_0703/head_link_data_clear/head_link_20200825.csv') # for test running
|
|
get_head_cols = ['len_tmp','status_0','status_1','status_2','status_3','status_4','rate_0','rate_1','rate_2','rate_3','rate_4']
|
|
get_head_cols.insert(0, 'order_id')
|
|
print('head_data.shape:',head_data.shape)
|
|
head_data = head_data[get_head_cols]
|
|
print('mk_data.shape',mk_data.shape)
|
|
mk_data = pd.merge(mk_data, head_data, how='left', on='order_id')
|
|
print('mk_data.shape',mk_data.shape)
|
|
del head_data
|
|
gc.collect()
|
|
|
|
|
|
print('-------------LOAD DATA for link_data----------------')
|
|
link_list = file_name(link_data_dir)
|
|
link_list.sort()
|
|
link_data = append_all_data(link_list, link_data_dir)
|
|
#link_data = pd.read_csv('/home/didi2021/didi2021/giscup_2021/final_train_data_0703/max_170_link_sqe_for_order/sqe_20200825_link.txt') # for test running
|
|
#del_link_cols = ['link_time_sub','link_time_sub_sum','link_time_sub_mean', 'link_time_sub_std','link_time_sub_skew']
|
|
#link_data.drop(del_link_cols, axis=1, inplace=True)
|
|
print('The init shape of link_data:', link_data.shape)
|
|
gc.collect()
|
|
|
|
|
|
print('-------------LOAD DATA for arrival_sqe_data----------------')
|
|
arrival_sqe_list = file_name(pre_arrival_sqe_dir)
|
|
arrival_sqe_list.sort()
|
|
arrival_sqe_data = append_all_data(arrival_sqe_list, pre_arrival_sqe_dir)
|
|
#arrival_sqe_data = pd.read_csv('/home/didi2021/didi2021/giscup_2021/final_train_data_0703/sqe_arrival_for_link/20200825.csv') # for test running
|
|
del arrival_sqe_data['slice_id']
|
|
del arrival_sqe_data['pre_arrival_status']
|
|
del arrival_sqe_data['arrive_slice_id']
|
|
arrival_cols = arrival_sqe_data.columns.tolist()
|
|
new_arrival_cols = ['future_'+i for i in arrival_cols if i != 'order_id']
|
|
new_arrival_cols.insert(0, 'order_id')
|
|
arrival_sqe_data.columns = new_arrival_cols
|
|
print('The init shape of arrival_sqe_data:', arrival_sqe_data.shape)
|
|
link_data = pd.merge(link_data, arrival_sqe_data, how='left', on='order_id')
|
|
del arrival_sqe_data
|
|
gc.collect()
|
|
link_cols_list = ['link_id', 'link_time', 'link_current_status', 'pr','dc']
|
|
|
|
|
|
|
|
print('-------------LOAD DATA for cross_data----------------')
|
|
cross_list = file_name(cross_data_dir)
|
|
cross_list.sort()
|
|
cross_data = append_all_data(cross_list, cross_data_dir)
|
|
#cross_data = pd.read_csv('/home/didi2021/didi2021/giscup_2021/final_train_data_0703/for_0714_cross_sqe_for_order/sqe_20200825_cross.txt') # for test running
|
|
del_cross_cols = ['cr_t_sub_by_min', 'cr_t_sub_by_q50', 'total_crosstime_std']
|
|
cross_data.drop(columns=del_cross_cols, axis=1, inplace=True)
|
|
print('The init shape of cross_data:', cross_data.shape)
|
|
cross_cols_list = ['cross_id', 'cross_time']
|
|
|
|
|
|
data = pd.merge(mk_data, link_data, how='left', on='order_id')
|
|
del mk_data
|
|
del link_data
|
|
gc.collect()
|
|
data = pd.merge(data, cross_data, how='left', on='order_id')
|
|
del cross_data
|
|
gc.collect()
|
|
|
|
# remove the class type and id and label, for deep inputs
|
|
mk_cols_list = data.columns.tolist()
|
|
remove_mk_cols = ['order_id', 'slice_id', 'hightemp', 'lowtemp', 'weather_le', 'dayofweek', 'date_time', 'ata']
|
|
mk_cols_list = list(set(mk_cols_list) - set(remove_mk_cols))
|
|
mk_cols_list = list(set(mk_cols_list) - set(link_cols_list))
|
|
mk_cols_list = list(set(mk_cols_list) - set(cross_cols_list))
|
|
print('lenght of mk_cols_list', len(mk_cols_list))
|
|
print('*-' * 40)
|
|
print('The finish shape of data is:', data.shape)
|
|
|
|
return data, mk_cols_list, link_cols_list, cross_cols_list
|
|
|
|
|
|
def processing_data(data, mk_cols_list, link_cols_list, cross_cols_list, WIDE_COLS, is_test=False):
|
|
"""
|
|
fix data, ast.literal_eval + StandardScaler + train_test_split
|
|
:return: train_data, val_data, test_data
|
|
"""
|
|
print('Now, Starting parallel_apply the link..................')
|
|
for i in tqdm(link_cols_list):
|
|
data[i] = data[i].parallel_apply(ast.literal_eval)
|
|
print('Now, Starting parallel_apply the cross..................')
|
|
for i in tqdm(cross_cols_list):
|
|
data[i] = data[i].parallel_apply(ast.literal_eval)
|
|
# data = data.fillna(0)
|
|
data.fillna(data.median(),inplace=True)
|
|
ss_cols = mk_cols_list + WIDE_COLS
|
|
|
|
# train, val
|
|
if is_test is True:
|
|
print('is_test is True')
|
|
ss = joblib.load('../model_h5/ss_scaler')
|
|
data[ss_cols] = ss.transform(data[ss_cols])
|
|
return data
|
|
else:
|
|
ss = StandardScaler()
|
|
ss.fit(data[ss_cols])
|
|
data[ss_cols] = ss.transform(data[ss_cols])
|
|
joblib.dump(ss, '../model_h5/ss_scaler')
|
|
print('is_test is False')
|
|
data['date_time'] = data['date_time'].astype(int)
|
|
print("type(data['date_time']):", data['date_time'].dtype)
|
|
# print('Here train_test_split..................')
|
|
# all_train_data, _ = train_test_split(all_train_data, test_size=0.9, random_state=42)
|
|
print('*-' * 40, 'The data.shape:', data.shape)
|
|
train_data, val_data = train_test_split(data, test_size=0.15, random_state=42)
|
|
train_data = train_data.reset_index()
|
|
val_data = val_data.reset_index()
|
|
del train_data['index']
|
|
del val_data['index']
|
|
return train_data, val_data
|
|
|
|
|
|
def processing_inputs(data, mk_cols_list, link_cols_list, cross_cols_list, WIDE_COLS):
|
|
"""
|
|
change the data for model
|
|
:return:
|
|
"""
|
|
if 'ata' in mk_cols_list:
|
|
print('The ata in the mk_cols_list')
|
|
if 'ata' in link_cols_list:
|
|
print('The ata in the link_cols_list')
|
|
if 'ata' in cross_cols_list:
|
|
print('The ata in the cross_cols_list')
|
|
if 'ata' in WIDE_COLS:
|
|
print('The ata in the WIDE_COLS')
|
|
#link_cols_list = ['link_id', 'link_time','link_id_count','pr','dc',
|
|
# 'top_a','link_current_status','link_ratio']
|
|
#cross_cols_list = ['cross_id', 'cross_time']
|
|
data_link_inputs = preprocess_inputs(data, cols=link_cols_list)
|
|
data_cross_inputs = preprocess_inputs(data, cols=cross_cols_list)
|
|
data_deep_input = data[mk_cols_list].values
|
|
data_wide_input = data[WIDE_COLS].values
|
|
data_inputs_slice = data['slice_id'].values
|
|
# print('--------------------------------test, ', min(data['slice_id'].values.tolist()))
|
|
data_labels = data['ata'].values
|
|
|
|
return data_link_inputs, data_cross_inputs, data_deep_input, data_wide_input, data_inputs_slice, data_labels
|