"sparkSql = (SparkSession.builder\n",
" .master(\"local\")\n",
" .appName(\"Kiva Exploration\")\n",
" .getOrCreate())\n",
"loans = sparkSql.read.format('json').load('kiva-data/loans.json')\n",
"lenders = sparkSql.read.format('json').load('kiva-data/lenders.json')\n",
"loans_lenders = sparkSql.read.format('json').load('kiva-data/loans_lenders.json')\n",
"# Custom Functions\n",
"## Gender Ratio\n",
"0 = All female\n",
"1 = All male"
"import pyspark\n",
"def gender_ratio(array):\n",
" num_males = 0\n",
" for item in array:\n",
" if item.gender == 'M':\n",
" num_males += 1\n",
" \n",
" return float(num_males) / len(array)\n",
" gender_ratio,\n",
" pyspark.sql.types.FloatType())"
"## Fetch GDP"
"import pandas as pd\n",
"from datetime import datetime\n",
"import numpy as np\n",
"# Load country info data\n",
"country_codes_raw = pd.read_csv('economic-data/country-codes.csv')\n",
"country_gdp_raw = pd.read_csv('economic-data/country-gdp.csv')\n",
"# Clean country codes data\n",
"country_codes = country_codes_raw[['official_name_en', 'ISO3166-1-Alpha-2', \n",
" 'ISO3166-1-Alpha-3', 'ISO4217-currency_alphabetic_code']]\n",
"# Clean gdp data\n",
"country_gdp = country_gdp_raw.drop(country_gdp_raw.columns[[0, 1]], axis=1)\n",
"country_gdp.columns = ['name', 'country_code_3', '2002', '2003', '2004', '2005', '2006',\n",
" '2007', '2008', '2009', '2010', '2011', '2012', '2013', '2014', '2015', '2016']\n",
"# Merge gdp and code\n",
"country_gdp = pd.merge(country_gdp, country_codes, left_on='country_code_3', right_on='ISO3166-1-Alpha-3', how='left')\n",
"country_gdp.drop(['official_name_en', 'ISO3166-1-Alpha-3', 'country_code_3'], axis=1, inplace=True)\n",
"country_gdp = country_gdp.rename(columns = {'ISO3166-1-Alpha-2':'country_code',\n",
" 'ISO4217-currency_alphabetic_code':'currency_code'})\n",
"country_gdp.replace('..', np.nan, inplace=True)\n",
"# Reorder columns\n",
"cols = list(country_gdp.columns)\n",
"cols.insert(1, cols.pop(cols.index('country_code')))\n",
"cols.insert(2, cols.pop(cols.index('currency_code')))\n",
"country_gdp = country_gdp.reindex(columns= cols)\n",
"def gdp(country_code, disbursal_date):\n",
" def historical_gdp(array):\n",
" array = np.array(map(float, array))\n",
" array = array[~np.isnan(array)] # Remove NaN\n",
" if len(array) == 0: # No GDP values\n",
" return 0\n",
" return float(np.mean(array, dtype=np.float64))\n",
" \n",
" # TODO: Unable to resolve country code WorldBank dataset has wrong alpha 3 codes e.g. Andorra causing issues\n",
" try:\n",
" float(country_code)\n",
" return 0\n",
" except:\n",
" if country_code not in list(country_gdp['country_code']):\n",
" return 0 # TODO: Bad solution ? \n",
" \n",
" # Get the historical average GDP if no disbursal date\n",
" all_gdp = country_gdp[country_gdp.country_code == country_code].values[0][3:]\n",
" if (disbursal_date is None): # or (country_gdp[date][country_gdp.country_code == country_code] == float('Nan')):\n",
" return historical_gdp(all_gdp)\n",
" \n",
" date = str(datetime.strptime(disbursal_date, '%Y-%m-%dT%H:%M:%SZ').year)\n",
" # Get the historical average GDP if no GDP for that year\n",
" if pd.isnull(country_gdp[date][country_gdp.country_code == country_code].values[0]):\n",
" return historical_gdp(all_gdp)\n",
" \n",
" return float(country_gdp[date][country_gdp.country_code == country_code].values[0])\n",
"sparkSql.udf.register('gdp', gdp, pyspark.sql.types.FloatType())"
"## Fetch Exchange Rates"
"currencies_raw = pd.read_csv('economic-data/currencies.csv')\n",
"# Cleanup\n",
"currencies = currencies_raw.drop(country_gdp_raw.columns[[0, 1]], axis=1)\n",
"currencies.columns = ['country_name', 'country_code_3', '2002', '2003', '2004', '2005', '2006',\n",
" '2007', '2008', '2009', '2010', '2011', '2012', '2013', '2014', '2015', '2016']\n",
"# Get ISO 2 code\n",
"currencies = pd.merge(currencies, country_codes, left_on='country_code_3', right_on='ISO3166-1-Alpha-3', how='left')\n",
"currencies.drop(['official_name_en', 'ISO3166-1-Alpha-3', 'country_code_3'], axis=1, inplace=True)\n",
"currencies = currencies.rename(columns = {'ISO3166-1-Alpha-2':'country_code',\n",
" 'ISO4217-currency_alphabetic_code':'currency_code'})\n",
"currencies.replace('..', np.nan, inplace=True)\n",
"# Add code for European Union\n",
"currencies.set_value(217, 'country_code', 'EU')\n",
"currencies.set_value(217, 'currency_code', 'EMU')\n",
"# Reorder columns\n",
"cols = list(currencies.columns)\n",
"cols.insert(1, cols.pop(cols.index('country_code')))\n",
"cols.insert(2, cols.pop(cols.index('currency_code')))\n",
"currencies = currencies.reindex(columns=cols)\n",
"def xchange_rate(country_code, disbursal_date):\n",
" def historical_rates(array):\n",
" array = np.array(map(float, array))\n",
" array = array[~np.isnan(array)] # Remove NaN\n",
" if len(array) == 0: # No rate values\n",
" return 1\n",
" return float(np.mean(array, dtype=np.float64))\n",
" \n",
" eu = ['AT','BE','BG','HR','CY','CZ','DK','EE','FI','FR','DE','GR','HU','IE',\n",
" 'IT','LV','LT','LU','MT','NL','PL','PT','RO','SK','SI','ES','SE','GB']\n",
" us = ['AS','GU','MP','PR','UM','VI']\n",
" try:\n",
" float(country_code) # Country code unknown?\n",
" if pd.isnull(country_code):\n",
" return 1 # TODO: Bad solution ??\n",
" except:\n",
" if country_code in eu:\n",
" country_code = 'EU'\n",
" elif country_code in us:\n",
" country_code = 'US'\n",
" if country_code not in list(currencies['country_code']):\n",
" return 1\n",
" \n",
" \n",
" # TODO: Unable to resolve country code WorldBank dataset has wrong alpha 3 codes e.g. Andorra causing\n",
" try:\n",
" float(country_code)\n",
" return 0\n",
" except:\n",
" if country_code not in list(currencies['country_code']):\n",
" return 0 # TODO: Bad solution \n",
" \n",
" # Get the historical average exchange rate if no disbursal date\n",
" all_rates = currencies[currencies.country_code == country_code].values[0][3:]\n",
" if (disbursal_date is None): # or (country_gdp[date][country_gdp.country_code == country_code] == float('Nan')):\n",
" return historical_rates(all_rates)\n",
" \n",
" date = str(datetime.strptime(disbursal_date, '%Y-%m-%dT%H:%M:%SZ').year)\n",
" # Get the historical average exchange rate if no GDP for that year\n",
" if pd.isnull(currencies[date][currencies.country_code == country_code].values[0]):\n",
" return historical_rates(all_rates)\n",
" \n",
" return float(currencies[date][currencies.country_code == country_code].values[0])\n",
"sparkSql.udf.register('xchange_rate', xchange_rate, pyspark.sql.types.FloatType())"
"# Fetch actual data\n",
"Get all data that we are going to use, get dummies, then split into train/validation/test."
"Query our datasets to train on."
"query = '''\n",
" id,\n",
" activity,\n",
" size(borrowers) as num_borrowers,\n",
" gender_ratio(borrowers) as gender_ratio,\n",
" lender_count,\n",
" location.country,\n",
" location.country_code,\n",
" partner_id,\n",
" sector,\n",
" tags,\n",
" DATEDIFF(terms.disbursal_date, planned_expiration_date) as loan_length,\n",
" terms.disbursal_amount,\n",
" terms.disbursal_currency,\n",
" terms.disbursal_date,\n",
" size(terms.scheduled_payments) as num_repayments,\n",
" terms.repayment_interval,\n",
" (status = 'refunded') OR\n",
" (status = 'defaulted') OR\n",
" (status = 'deleted') OR\n",
" (status = 'issue') OR\n",
" (status = 'inactive_expired') OR\n",
" (status = 'expired') OR\n",
" (status = 'inactive') OR\n",
" (delinquent = True) THEN 1 ELSE 0 END AS bad_loan,\n",
" gdp(location.country_code, terms.disbursal_date) as gdp,\n",
" xchange_rate(location.country_code, terms.disbursal_date) as xchange_rate,\n",
" status,\n",
" delinquent\n",
" \n",
"FROM loans\n",
" status != 'fundraising' AND\n",
" status != 'funded'\n",
"dataset = sparkSql.sql(query).toPandas()"
"# Data Splits"
"X_columns = [\n",
" 'activity', 'num_borrowers', 'gender_ratio',\n",
" 'lender_count', 'country', 'partner_id', 'sector',\n",
" 'loan_length', 'disbursal_amount', 'disbursal_currency',\n",
" 'num_repayments', 'repayment_interval', 'gdp', 'xchange_rate'\n",
"y_column = ['bad_loan']\n",
"dummy_set = pd.get_dummies(dataset[X_columns + y_column])\n",
"Now we can restart the kernel to clear memory, and start processing."
"import pandas as pd\n",
"processed_dummy = pd.read_csv('processed_dummy.csv', index_col=0)"
"import numpy as np\n",
"train, validate, test = np.split(processed_dummy.sample(frac=1, random_state=0),\n",
" [int(.6*len(processed_dummy)),\n",
" int(.8*len(processed_dummy))])\n",
"# Testing all the models"
"import pandas as pd\n",
"train = pd.read_csv('processed_train.csv', index_col=0).dropna(axis=1)\n",
"valid = pd.read_csv('processed_validate.csv', index_col=0).dropna(axis=1)"
"Naive guess:"
"train_x = train.drop('bad_loan', axis=1)\n",
"train_y = train['bad_loan']\n",
"valid_x = valid.drop('bad_loan', axis=1)\n",
"valid_y = valid['bad_loan']\n",
"1 - train_y.mean()"
"from itertools import product\n",
"import pickle\n",
"from sklearn.svm import SVC\n",
"svc_params = product([1, .5, 1.5], [.001, .01, .1])\n",
"for C, gamma in svc_params:\n",
" svc = SVC(C=C, gamma=gamma)\n",
" svc.fit(train_x, train_y)\n",
" with open('svc_{}_{}.pickle'.format(C, gamma), 'w') as handle:\n",
" pickle.dump(svc, handle)\n",
" \n",
" print(\"C: {}; gamma: {}; score: {}\".format(\n",
" C, gamma, svc.score(train_x, train_y)))"
"from sklearn.discriminant_analysis import LinearDiscriminantAnalysis\n",
"# Number of columns is 342\n",
"for n_components in [342, 250, 150, 75]\n",
" lda = LinearDiscriminantAnalysis(n_components=n_components)\n",
" lda.fit(train_x, train_y)\n",
" with open('lda_{}.pickle'.format(n_components), 'w') as handle:\n",
" pickle.dump(lda, handle)\n",
" \n",
" print(\"N_components: {}; score: {}\".format(\n",
" n_components, lda.score(valid_x, valid_y)))"
"from sklearn.ensemble import RandomForestClassifier\n",
"for n_estimators in [10, 50, 75, 100]:\n",
" rf = RandomForestClassifier(n_estimators=n_estimators)\n",
" rf.fit(train_x, train_y)\n",
" with open('rf_{}.pickle'.format(n_estimators), 'w') as handle:\n",
" pickle.dump(rf, handle)\n",
" \n",
" print(\"N_estimators: {}; score: {}\".format(\n",
" n_estimators, score(valid_x, valid_y)))"
