{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import numpy as np\n", "from tqdm import tqdm\n", "from sklearn.preprocessing import LabelEncoder\n", "from dask.distributed import wait\n", "import glob\n", "\n", "SENSORS = ['acce','acce_uncali','gyro',\n", " 'gyro_uncali','magn','magn_uncali','ahrs']\n", "\n", "NFEAS = {\n", " 'acce': 3,\n", " 'acce_uncali': 3,\n", " 'gyro': 3,\n", " 'gyro_uncali': 3,\n", " 'magn': 3,\n", " 'magn_uncali': 3,\n", " 'ahrs': 3,\n", " 'wifi': 1,\n", " 'ibeacon': 1,\n", " 'waypoint': 3\n", "}\n", "\n", "ACOLS = ['timestamp','x','y','z']\n", " \n", "FIELDS = {\n", " 'acce': ACOLS,\n", " 'acce_uncali': ACOLS,\n", " 'gyro': ACOLS,\n", " 'gyro_uncali': ACOLS,\n", " 'magn': ACOLS,\n", " 'magn_uncali': ACOLS,\n", " 'ahrs': ACOLS,\n", " 'wifi': ['timestamp','ssid','bssid','rssi','last_timestamp'],\n", " 'ibeacon': ['timestamp','code','rssi','last_timestamp'],\n", " 'waypoint': ['timestamp','x','y']\n", "}\n", "\n", "def to_frame(data, col):\n", " cols = FIELDS[col]\n", " is_dummy = False\n", " if data.shape[0]>0:\n", " df = pd.DataFrame(data, columns=cols)\n", " else:\n", " df = create_dummy_df(cols)\n", " is_dummy = True\n", " for col in df.columns:\n", " if 'timestamp' in col:\n", " df[col] = df[col].astype('int64')\n", " return df, is_dummy\n", "\n", "def create_dummy_df(cols):\n", " df = pd.DataFrame()\n", " for col in cols:\n", " df[col] = [0]\n", " if col in ['ssid','bssid']:\n", " df[col] = df[col].map(str)\n", " return df\n", "\n", "from dataclasses import dataclass\n", "\n", "import numpy as np\n", "\n", "\n", "@dataclass\n", "class ReadData:\n", " acce: np.ndarray\n", " acce_uncali: np.ndarray\n", " gyro: np.ndarray\n", " gyro_uncali: np.ndarray\n", " magn: np.ndarray\n", " magn_uncali: np.ndarray\n", " ahrs: np.ndarray\n", " wifi: np.ndarray\n", " ibeacon: np.ndarray\n", " waypoint: np.ndarray\n", "\n", "\n", "def read_data_file(data_filename):\n", " acce = []\n", " acce_uncali = []\n", " gyro = []\n", " gyro_uncali = []\n", " magn = []\n", " magn_uncali = []\n", " ahrs = []\n", " wifi = []\n", " ibeacon = []\n", " waypoint = []\n", "\n", " with open(data_filename, 'r', encoding='utf-8') as file:\n", " lines = file.readlines()\n", "\n", " for line_data in lines:\n", " line_data = line_data.strip()\n", " if not line_data or line_data[0] == '#':\n", " continue\n", "\n", " line_data = line_data.split('\\t')\n", "\n", " if line_data[1] == 'TYPE_ACCELEROMETER':\n", " acce.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])\n", " continue\n", "\n", " if line_data[1] == 'TYPE_ACCELEROMETER_UNCALIBRATED':\n", " acce_uncali.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])\n", " continue\n", "\n", " if line_data[1] == 'TYPE_GYROSCOPE':\n", " gyro.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])\n", " continue\n", "\n", " if line_data[1] == 'TYPE_GYROSCOPE_UNCALIBRATED':\n", " gyro_uncali.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])\n", " continue\n", "\n", " if line_data[1] == 'TYPE_MAGNETIC_FIELD':\n", " magn.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])\n", " continue\n", "\n", " if line_data[1] == 'TYPE_MAGNETIC_FIELD_UNCALIBRATED':\n", " magn_uncali.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])\n", " continue\n", "\n", " if line_data[1] == 'TYPE_ROTATION_VECTOR':\n", " if len(line_data)>=5:\n", " ahrs.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])\n", " continue\n", "\n", " if line_data[1] == 'TYPE_WIFI':\n", " sys_ts = line_data[0]\n", " ssid = line_data[2]\n", " bssid = line_data[3]\n", " rssi = line_data[4]\n", " lastseen_ts = line_data[6]\n", " wifi_data = [sys_ts, ssid, bssid, rssi, lastseen_ts]\n", " wifi.append(wifi_data)\n", " continue\n", "\n", " if line_data[1] == 'TYPE_BEACON':\n", " ts = line_data[0]\n", " uuid = line_data[2]\n", " major = line_data[3]\n", " minor = line_data[4]\n", " rssi = line_data[6]\n", " lastts = line_data[-1]\n", " ibeacon_data = [ts, '_'.join([uuid, major, minor]), rssi, lastts]\n", " ibeacon.append(ibeacon_data)\n", " continue\n", "\n", " if line_data[1] == 'TYPE_WAYPOINT':\n", " waypoint.append([int(line_data[0]), float(line_data[2]), float(line_data[3])])\n", "\n", " acce = np.array(acce)\n", " acce_uncali = np.array(acce_uncali)\n", " gyro = np.array(gyro)\n", " gyro_uncali = np.array(gyro_uncali)\n", " magn = np.array(magn)\n", " magn_uncali = np.array(magn_uncali)\n", " ahrs = np.array(ahrs)\n", " wifi = np.array(wifi)\n", " ibeacon = np.array(ibeacon)\n", " waypoint = np.array(waypoint)\n", "\n", " return ReadData(acce, acce_uncali, gyro, gyro_uncali, magn, magn_uncali, ahrs, wifi, ibeacon, waypoint)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "def get_test_dfs(PATH, test_files):\n", " dtest = get_test_df(PATH)\n", " buildings = set(dtest['building'].values.tolist())\n", " dws = {}\n", " ntest_files = []\n", " for fname in tqdm(test_files):\n", " path = fname.split('/')[-1].split('.')[0]\n", " mask = dtest['path'] == path\n", " dws[fname] = dtest.loc[mask, ['timestamp','x','y','floor','building','site_path_timestamp']].copy().reset_index(drop=True)\n", " ntest_files.append(fname)\n", " return dws\n", "\n", "def get_test_df(PATH):\n", " dtest = pd.read_csv(f'{PATH}/sample_submission.csv')\n", " dtest['building'] = dtest['site_path_timestamp'].apply(lambda x: x.split('_')[0])\n", " dtest['path'] = dtest['site_path_timestamp'].apply(lambda x: x.split('_')[1])\n", " dtest['timestamp'] = dtest['site_path_timestamp'].apply(lambda x: x.split('_')[2])\n", " dtest['timestamp'] = dtest['timestamp'].astype('int64')\n", " dtest = dtest.sort_values(['path','timestamp']).reset_index(drop=True)\n", " return dtest\n", "\n", "def get_time_gap(name):\n", " data = read_data_file(name)\n", " db,no_ibeacon = to_frame(data.ibeacon,'ibeacon')\n", "# print(db,no_ibeacon)\n", " \n", " if no_ibeacon==0:\n", " gap = db['last_timestamp'] - db['timestamp']\n", " assert gap.unique().shape[0]==1\n", " return gap.values[0],no_ibeacon\n", " \n", " if no_ibeacon==1:\n", " # Group wifis by timestamp\n", " wifi_groups = pd.DataFrame(data.wifi).groupby(0) \n", " # Find which one is the most recent of all time points.\n", " est_ts = (wifi_groups[4].max().astype(int) - wifi_groups[0].max().astype(int)).max() \n", " return est_ts,no_ibeacon\n", "\n", " \n", "\n", "def fix_timestamp_test(df, gap):\n", " df['real_timestamp'] = df['timestamp'] + gap\n", " return df" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['../input/indoor-location-navigation/test/00ff0c9a71cc37a2ebdd0f05.txt',\n", " '../input/indoor-location-navigation/test/01c41f1aeba5c48c2c4dd568.txt',\n", " '../input/indoor-location-navigation/test/030b3d94de8acae7c936563d.txt',\n", " '../input/indoor-location-navigation/test/0389421238a7e2839701df0f.txt']" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "test_files_ori = glob.glob('../input/indoor-location-navigation/test/*.txt')\n", "test_files_ori[:4]" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/home/ec2-user/anaconda3/lib/python3.7/site-packages/distributed/dashboard/core.py:79: UserWarning: \n", "Port 8787 is already in use. \n", "Perhaps you already have a cluster running?\n", "Hosting the diagnostics dashboard on a random port instead.\n", " warnings.warn(\"\\n\" + msg)\n" ] }, { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "
\n", "

Client

\n", "\n", "
\n", "

Cluster

\n", "
    \n", "
  • Workers: 8
  • \n", "
  • Cores: 8
  • \n", "
  • Memory: 32.89 GB
  • \n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import dask\n", "from dask.distributed import Client, wait, LocalCluster\n", "\n", "# set n_workers to number of cores\n", "client = Client(n_workers=8, \n", " threads_per_worker=1)\n", "client" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "100%|██████████| 626/626 [00:00<00:00, 4552.03it/s]\n", "100%|██████████| 626/626 [00:16<00:00, 37.39it/s] \n" ] } ], "source": [ "futures = []\n", "for fname in tqdm(test_files_ori, total=len(test_files_ori)):\n", " f = client.submit(get_time_gap,fname)\n", " futures.append(f)\n", " \n", "testpath2gap = {}\n", "for f,fname in tqdm(zip(futures, test_files_ori), total=len(test_files_ori)):\n", " testpath2gap[fname.split('/')[-1].replace('.txt','')] = f.result()\n", " " ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "import pickle\n", "with open('testpath2gap.pkl','wb') as f:\n", " pickle.dump(testpath2gap,f)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.7" } }, "nbformat": 4, "nbformat_minor": 4 }