In this notebook, I'd like to share how to label encode all wifi files in parallel using dask. On my workstation with 20 cpu cores, it took only 10 mins to encode all the data. Due to the limitation of kaggle kernel, I have to set number of workers to 2. 

In [1]:
! ls ../input/indoor-location-navigation

metadata  sample_submission.csv  test  train


In [2]:
from glob import glob
import os
import sys
from sklearn.preprocessing import LabelEncoder

import numpy as np
import pandas as pd
from tqdm import tqdm

In [3]:
import pandas as pd
import numpy as np
from tqdm import tqdm
from sklearn.preprocessing import LabelEncoder
from glob import glob
from dask.distributed import wait

SENSORS = ['acce','acce_uncali','gyro',
           'gyro_uncali','magn','magn_uncali','ahrs']

NFEAS = {
    'acce': 3,
    'acce_uncali': 3,
    'gyro': 3,
    'gyro_uncali': 3,
    'magn': 3,
    'magn_uncali': 3,
    'ahrs': 3,
    'wifi': 1,
    'ibeacon': 1,
    'waypoint': 3
}

ACOLS = ['timestamp','x','y','z']
        
FIELDS = {
    'acce': ACOLS,
    'acce_uncali': ACOLS,
    'gyro': ACOLS,
    'gyro_uncali': ACOLS,
    'magn': ACOLS,
    'magn_uncali': ACOLS,
    'ahrs': ACOLS,
    'wifi': ['timestamp','ssid','bssid','rssi','last_timestamp'],
    'ibeacon': ['timestamp','code','rssi'],
    'waypoint': ['timestamp','x','y']
}

def to_frame(data, col):
    cols = FIELDS[col]
    if data.shape[0]>0:
        df = pd.DataFrame(data, columns=cols)
    else:
        df = create_dummy_df(cols)
    for col in df.columns:
        if 'timestamp' in col:
            df[col] = df[col].astype('int64')
    return df

def create_dummy_df(cols):
    df = pd.DataFrame()
    for col in cols:
        df[col] = [0]
        if col in ['ssid','bssid']:
            df[col] = df[col].map(str)
    return df

def get_building_floor(fname):
    xx = fname.split('/')
    return xx[-3],xx[-2]

def get_test_building(name):
    with open(name) as f:
        for c,line in enumerate(f):
            if c==1:
                x = line.split()[1].split(':')[1]
                return x  

def get_floor_target(floor):
    floor = floor.lower()
    if floor in ['bf','bm']:
        return None
    elif floor == 'b':
        return -1
    if floor.startswith('f'):
        return int(floor[1])-1
    elif floor.endswith('f'):
        return int(floor[0])-1
    elif floor.startswith('b'):
        return -int(floor[1])
    elif floor.endswith('b'):
        return -int(floor[0])
    else:
        return None


In [4]:
from dataclasses import dataclass

import numpy as np


@dataclass
class ReadData:
    acce: np.ndarray
    acce_uncali: np.ndarray
    gyro: np.ndarray
    gyro_uncali: np.ndarray
    magn: np.ndarray
    magn_uncali: np.ndarray
    ahrs: np.ndarray
    wifi: np.ndarray
    ibeacon: np.ndarray
    waypoint: np.ndarray


def read_data_file(data_filename):
    acce = []
    acce_uncali = []
    gyro = []
    gyro_uncali = []
    magn = []
    magn_uncali = []
    ahrs = []
    wifi = []
    ibeacon = []
    waypoint = []

    with open(data_filename, 'r', encoding='utf-8') as file:
        lines = file.readlines()

    for line_data in lines:
        line_data = line_data.strip()
        if not line_data or line_data[0] == '#':
            continue

        line_data = line_data.split('\t')

        if line_data[1] == 'TYPE_ACCELEROMETER':
            acce.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])
            continue

        if line_data[1] == 'TYPE_ACCELEROMETER_UNCALIBRATED':
            acce_uncali.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])
            continue

        if line_data[1] == 'TYPE_GYROSCOPE':
            gyro.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])
            continue

        if line_data[1] == 'TYPE_GYROSCOPE_UNCALIBRATED':
            gyro_uncali.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])
            continue

        if line_data[1] == 'TYPE_MAGNETIC_FIELD':
            magn.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])
            continue

        if line_data[1] == 'TYPE_MAGNETIC_FIELD_UNCALIBRATED':
            magn_uncali.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])
            continue

        if line_data[1] == 'TYPE_ROTATION_VECTOR':
            if len(line_data)>=5:
                ahrs.append([int(line_data[0]), float(line_data[2]), float(line_data[3]), float(line_data[4])])
            continue

        if line_data[1] == 'TYPE_WIFI':
            sys_ts = line_data[0]
            ssid = line_data[2]
            bssid = line_data[3]
            rssi = line_data[4]
            lastseen_ts = line_data[6]
            wifi_data = [sys_ts, ssid, bssid, rssi, lastseen_ts]
            wifi.append(wifi_data)
            continue

        if line_data[1] == 'TYPE_BEACON':
            ts = line_data[0]
            uuid = line_data[2]
            major = line_data[3]
            minor = line_data[4]
            rssi = line_data[6]
            ibeacon_data = [ts, '_'.join([uuid, major, minor]), rssi]
            ibeacon.append(ibeacon_data)
            continue

        if line_data[1] == 'TYPE_WAYPOINT':
            waypoint.append([int(line_data[0]), float(line_data[2]), float(line_data[3])])

    acce = np.array(acce)
    acce_uncali = np.array(acce_uncali)
    gyro = np.array(gyro)
    gyro_uncali = np.array(gyro_uncali)
    magn = np.array(magn)
    magn_uncali = np.array(magn_uncali)
    ahrs = np.array(ahrs)
    wifi = np.array(wifi)
    ibeacon = np.array(ibeacon)
    waypoint = np.array(waypoint)

    return ReadData(acce, acce_uncali, gyro, gyro_uncali, magn, magn_uncali, ahrs, wifi, ibeacon, waypoint)


In [5]:
import dask
from dask.distributed import Client, wait, LocalCluster

In [6]:
# set n_workers to number of cores
client = Client(n_workers=2, 
                threads_per_worker=1)
client

0,1
Client  Scheduler: tcp://127.0.0.1:40123  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 2  Memory: 17.18 GB


### Label encode wifi ID features

In [7]:
PATH = '../input/indoor-location-navigation'
OUT = '../input/wifi_lbl_encode'
os.mkdir(OUT)

In [8]:
def read_wifi_ssid_bssid(name):
    data = read_data_file(name)
    dw = to_frame(data.wifi,'wifi')
    ss = dw['ssid'].unique()
    bs = dw['bssid'].unique()
    return ss,bs

def encode_wifi_ssid_bssid(name, lbl_ssid, lbl_bssid):
    data = read_data_file(name)
    dw = to_frame(data.wifi,'wifi')
    dw['ssid'] = lbl_ssid.transform(dw['ssid'])
    dw['bssid'] = lbl_bssid.transform(dw['bssid'])
    name = name.replace(PATH,OUT)
    dw.to_csv(name,index=False)
    return dw

### Run

In [9]:
train_files = glob(f'{PATH}/train/*/*/*.txt')
test_files = glob(f'{PATH}/test/*.txt')
len(test_files),len(train_files)

(626, 26925)

In [10]:
name = train_files[0]
ss,bs = read_wifi_ssid_bssid(name)
type(ss),ss.shape,bs.shape,name

(numpy.ndarray,
 (19,),
 (19,),
 '../input/indoor-location-navigation/train/5cd56c0ce2acfd2d33b6ab27/B1/5d09a625bd54340008acddb9.txt')

In [11]:
os.mkdir(f'{OUT}/train')
os.mkdir(f'{OUT}/test')
for site in os.listdir(f'{PATH}/train'):
    os.mkdir(f'{OUT}/train/{site}')
    for floor in os.listdir(f'{PATH}/train/{site}'):
        os.mkdir(f'{OUT}/train/{site}/{floor}')

In [12]:
%%time
buildings = []
floors = []
used = []
for fname in tqdm(train_files):
    b,f = get_building_floor(fname)
    f = get_floor_target(f)
    if f is None:
        continue
    used.append(fname)
    buildings.append(b)
    floors.append(f)

100%|██████████| 26925/26925 [00:00<00:00, 288117.37it/s]

CPU times: user 97.7 ms, sys: 2.84 ms, total: 101 ms
Wall time: 101 ms





In [13]:
%%time
futures = [] # save the future since dask is lazy, otherwise nothing is executed.
for fname in tqdm(test_files+used):
    f = client.submit(read_wifi_ssid_bssid,fname) 
    futures.append(f) 
_ = wait(futures)

100%|██████████| 24732/24732 [00:11<00:00, 2222.39it/s]


CPU times: user 39min 16s, sys: 1min 40s, total: 40min 57s
Wall time: 1h 11min 28s


In [14]:
%%time
ss = []
bs = []
for f in tqdm(futures):
    s,b = f.result()
    ss.append(s)
    bs.append(b)
ss = np.concatenate(ss)
bs = np.concatenate(bs)
ss.shape, bs.shape

100%|██████████| 24732/24732 [01:40<00:00, 246.25it/s]


CPU times: user 1min 9s, sys: 9.36 s, total: 1min 18s
Wall time: 1min 40s


((1536700,), (4858800,))

In [15]:
%%time

lbl_ssid = LabelEncoder()
lbl_ssid.fit(ss)
print(lbl_ssid.classes_.shape)
lbl_ssid.classes_

(74246,)
CPU times: user 283 ms, sys: 6.93 ms, total: 290 ms
Wall time: 289 ms


array(['0', '000015ed38fdbc763149432d7ba3b7bd208461d3',
       '000073084baa0ea60776e25c07c6ee6b988aa072', ...,
       'fffe51167f1ad1bf26dda45ccfc40b5d7fab8384',
       'fffe9c57a9b25623ac219260c1b5155087a788e9',
       'ffff286949cbddd576ed8b2d5e16ce30eab87af6'], dtype=object)

In [16]:
%%time

lbl_bssid = LabelEncoder()
lbl_bssid.fit(bs)
print(lbl_bssid.classes_.shape)
lbl_bssid.classes_

(216210,)
CPU times: user 948 ms, sys: 9.79 ms, total: 958 ms
Wall time: 942 ms


array(['0', '00001d7b6fbf0a24da65285b686b03c6e796962a',
       '0000fe40d201cfc6cada502b07f29883cd17fe4a', ...,
       'ffff295a9e6ae90d9de67a1e1939ba969b765259',
       'ffff2c098362b016764229a1bb5e06d10cf0d895',
       'ffffb8116ceb5c0326ec2eb039028ec71ffdfbab'], dtype=object)

Let's look at one encode result.

In [17]:
encode_wifi_ssid_bssid(name, lbl_ssid, lbl_bssid)
name = name.replace(PATH,OUT)
df = pd.read_csv(name)
df.head()

Unnamed: 0,timestamp,ssid,bssid,rssi,last_timestamp
0,1560913370116,54774,69149,-90,1560913363914
1,1560913370116,18591,182228,-80,1560913363973
2,1560913370116,54874,109727,-87,1560913354584
3,1560913370116,36901,124351,-85,1560913351034
4,1560913370116,51444,114314,-84,1560913363543


In [18]:
%%time

lbl_ssid_f = client.scatter(lbl_ssid)
lbl_bssid_f = client.scatter(lbl_bssid)
_ = wait(lbl_ssid_f)
_ = wait(lbl_bssid_f)

CPU times: user 284 ms, sys: 34 ms, total: 317 ms
Wall time: 2.6 s


In [19]:
%%time
futures = [] # save the future since dask is lazy, otherwise nothing is executed.

for fname in tqdm(test_files+used):
    f = client.submit(encode_wifi_ssid_bssid, fname, lbl_ssid_f, lbl_bssid_f)
    futures.append(f) 
_ = wait(futures)

100%|██████████| 24732/24732 [00:17<00:00, 1415.46it/s]


CPU times: user 54min 44s, sys: 2min 6s, total: 56min 50s
Wall time: 2h 1min 49s
