In [1]:
sparkSql = (SparkSession.builder
 .master("local")
 .appName("Kiva Exploration")
 .getOrCreate())

loans = sparkSql.read.format('json').load('kiva-data/loans.json')
loans.registerTempTable('loans')
lenders = sparkSql.read.format('json').load('kiva-data/lenders.json')
lenders.registerTempTable('lenders')
loans_lenders = sparkSql.read.format('json').load('kiva-data/loans_lenders.json')
loans_lenders.registerTempTable('loans_lenders')

In [2]:
loans.printSchema()

root
 |-- activity: string (nullable = true)
 |-- basket_amount: long (nullable = true)
 |-- bonus_credit_eligibility: boolean (nullable = true)
 |-- borrowers: array (nullable = true)
 | |-- element: struct (containsNull = true)
 | | |-- first_name: string (nullable = true)
 | | |-- gender: string (nullable = true)
 | | |-- last_name: string (nullable = true)
 | | |-- pictured: boolean (nullable = true)
 |-- currency_exchange_loss_amount: double (nullable = true)
 |-- delinquent: boolean (nullable = true)
 |-- description: struct (nullable = true)
 | |-- languages: array (nullable = true)
 | | |-- element: string (containsNull = true)
 | |-- texts: struct (nullable = true)
 | | |-- ar: string (nullable = true)
 | | |-- en: string (nullable = true)
 | | |-- es: string (nullable = true)
 | | |-- fr: string (nullable = true)
 | | |-- id: string (nullable = true)
 | | |-- mn: string (nullable = true)
 | | |-- pt: string (nullable = true)
 | | |-- ru: string (nullable = true)
 | | |-- vi: 

In [12]:
import pyspark

def male_proportion(array):
 num_males = 0
 for item in array:
 if item.gender == 'M':
 num_males += 1
 
 return float(num_males) / len(array)

sparkSql.udf.register('male_proportion',
 male_proportion,
 pyspark.sql.types.FloatType())

train, validation, test = loans.randomSplit([.6, .2, .2], 101)

query = '''
SELECT
 id,
 activity,
 size(borrowers) as num_borrowers,
 male_proportion(borrowers) as male_proportion,
 lender_count,
 location.country,
 location.country_code,
 partner_id,
 sector,
 tags,
 DATEDIFF(terms.disbursal_date, planned_expiration_date) as loan_length,
 terms.disbursal_amount,
 terms.disbursal_currency,
 terms.disbursal_date,
 size(terms.scheduled_payments) as num_repayments,
 terms.repayment_interval,
 CASE WHEN
 (status = 'refunded') OR
 (status = 'defaulted') OR
 (status = 'deleted') OR
 (status = 'issue') OR
 (status = 'inactive_expired') OR
 (status = 'expired') OR
 (status = 'inactive') OR
 (delinquent = True) THEN 1 ELSE 0 END AS bad_loan,
 gdp(location.country_code, terms.disbursal_date) as gdp,
 xchange_rate(location.country_code, terms.disbursal_date) as xchange_rate,
 status,
 delinquent
 
FROM {}
WHERE
 status != 'fundraising' AND
 status != 'funded'
'''

train.registerTempTable('loans_train')
validation.registerTempTable('loans_validation')
test.registerTempTable('loans_test')

sparkSql.sql(query.format('loans_validation')).write.json('validation_data-filtered.json')

## Fetch GDP

In [3]:
import pandas as pd
from datetime import datetime
import numpy as np


# Load country info data
country_codes_raw = pd.read_csv('economic-data/country-codes.csv')
country_gdp_raw = pd.read_csv('economic-data/country-gdp.csv')

In [4]:
# Clean country codes data
country_codes = country_codes_raw[['official_name_en', 'ISO3166-1-Alpha-2', 
 'ISO3166-1-Alpha-3', 'ISO4217-currency_alphabetic_code']]

# Clean gdp data
country_gdp = country_gdp_raw.drop(country_gdp_raw.columns[[0, 1]], axis=1)
country_gdp.columns = ['name', 'country_code_3', '2002', '2003', '2004', '2005', '2006',
 '2007', '2008', '2009', '2010', '2011', '2012', '2013', '2014', '2015', '2016']

In [5]:
# Merde gdp and code
country_gdp = pd.merge(country_gdp, country_codes, left_on='country_code_3', right_on='ISO3166-1-Alpha-3', how='left')
country_gdp.drop(['official_name_en', 'ISO3166-1-Alpha-3', 'country_code_3'], axis=1, inplace=True)
country_gdp = country_gdp.rename(columns = {'ISO3166-1-Alpha-2':'country_code',
 'ISO4217-currency_alphabetic_code':'currency_code'})
country_gdp.replace('..', np.nan, inplace=True)

# Reorder columns
cols = list(country_gdp.columns)
cols.insert(1, cols.pop(cols.index('country_code')))
cols.insert(2, cols.pop(cols.index('currency_code')))
country_gdp = country_gdp.reindex(columns= cols)

In [6]:
def gdp(country_code, disbursal_date):
 def historical_gdp(array):
 array = np.array(map(float, array))
 array = array[~np.isnan(array)] # Remove NaN
 if len(array) == 0: # No GDP values
 return 0
 return float(np.mean(array, dtype=np.float64))
 
 # TODO: Unable to resolve country code WorldBank dataset has wrong alpha 3 codes e.g. Andorra causing issues
 try:
 float(country_code)
 return 0
 except:
 if country_code not in list(country_gdp['country_code']):
 return 0 # TODO: Bad solution ? 
 
 # Get the historical average GDP if no disbursal date
 all_gdp = country_gdp[country_gdp.country_code == country_code].values[0][3:]
 if (disbursal_date is None): # or (country_gdp[date][country_gdp.country_code == country_code] == float('Nan')):
 return historical_gdp(all_gdp)
 
 date = str(datetime.strptime(disbursal_date, '%Y-%m-%dT%H:%M:%SZ').year)
 # Get the historical average GDP if no GDP for that year
 if pd.isnull(country_gdp[date][country_gdp.country_code == country_code].values[0]):
 return historical_gdp(all_gdp)
 
 return float(country_gdp[date][country_gdp.country_code == country_code].values[0])

sparkSql.udf.register('gdp', gdp, pyspark.sql.types.FloatType())

## Fetch Exchange Rates

In [7]:
currencies_raw = pd.read_csv('economic-data/currencies.csv')

In [8]:
# Cleanup
currencies = currencies_raw.drop(country_gdp_raw.columns[[0, 1]], axis=1)
currencies.columns = ['country_name', 'country_code_3', '2002', '2003', '2004', '2005', '2006',
 '2007', '2008', '2009', '2010', '2011', '2012', '2013', '2014', '2015', '2016']

In [9]:
# Get ISO 2 code
currencies = pd.merge(currencies, country_codes, left_on='country_code_3', right_on='ISO3166-1-Alpha-3', how='left')
currencies.drop(['official_name_en', 'ISO3166-1-Alpha-3', 'country_code_3'], axis=1, inplace=True)
currencies = currencies.rename(columns = {'ISO3166-1-Alpha-2':'country_code',
 'ISO4217-currency_alphabetic_code':'currency_code'})
currencies.replace('..', np.nan, inplace=True)

# Add code for European Union
currencies.set_value(217, 'country_code', 'EU')
currencies.set_value(217, 'currency_code', 'EMU')

# Reorder columns
cols = list(currencies.columns)
cols.insert(1, cols.pop(cols.index('country_code')))
cols.insert(2, cols.pop(cols.index('currency_code')))
currencies = currencies.reindex(columns=cols)

In [10]:
def xchange_rate(country_code, disbursal_date):
 def historical_rates(array):
 array = np.array(map(float, array))
 array = array[~np.isnan(array)] # Remove NaN
 if len(array) == 0: # No rate values
 return 1
 return float(np.mean(array, dtype=np.float64))
 
 eu = ['AT','BE','BG','HR','CY','CZ','DK','EE','FI','FR','DE','GR','HU','IE',
 'IT','LV','LT','LU','MT','NL','PL','PT','RO','SK','SI','ES','SE','GB']
 us = ['AS','GU','MP','PR','UM','VI']
 try:
 float(country_code) # Country code unknown?
 if pd.isnull(country_code):
 return 1 # TODO: Bad solution ??
 except:
 if country_code in eu:
 country_code = 'EU'
 elif country_code in us:
 country_code = 'US'
 if country_code not in list(currencies['country_code']):
 return 1
 
 
 # TODO: Unable to resolve country code WorldBank dataset has wrong alpha 3 codes e.g. Andorra causing
 try:
 float(country_code)
 return 0
 except:
 if country_code not in list(currencies['country_code']):
 return 0 # TODO: Bad solution 
 
 # Get the historical average exchange rate if no disbursal date
 all_rates = currencies[currencies.country_code == country_code].values[0][3:]
 if (disbursal_date is None): # or (country_gdp[date][country_gdp.country_code == country_code] == float('Nan')):
 return historical_rates(all_rates)
 
 date = str(datetime.strptime(disbursal_date, '%Y-%m-%dT%H:%M:%SZ').year)
 # Get the historical average exchange rate if no GDP for that year
 if pd.isnull(currencies[date][currencies.country_code == country_code].values[0]):
 return historical_rates(all_rates)
 
 return float(currencies[date][currencies.country_code == country_code].values[0])

sparkSql.udf.register('xchange_rate', xchange_rate, pyspark.sql.types.FloatType())

In [13]:
# sparkSql.sql(query.format('loans_validation')).take(10)
sparkSql.sql(query.format('loans_validation')).write.json('validation_data-filtered.json')

[Row(id=507280, activity=u'Agriculture', num_borrowers=10, male_proportion=0.10000000149011612, lender_count=91, country=u'Rwanda', country_code=u'RW', partner_id=170, sector=u'Agriculture', tags=[], loan_length=-59, disbursal_amount=1500000.0, disbursal_currency=u'RWF', disbursal_date=u'2012-11-15T08:00:00Z', num_repayments=1, repayment_interval=u'At end of term', bad_loan=0, gdp=667.4146118164062, xchange_rate=614.295166015625, status=u'paid', delinquent=None),
 Row(id=508860, activity=u'Agriculture', num_borrowers=1, male_proportion=1.0, lender_count=28, country=u'Rwanda', country_code=u'RW', partner_id=170, sector=u'Agriculture', tags=[], loan_length=-52, disbursal_amount=500000.0, disbursal_currency=u'RWF', disbursal_date=u'2012-11-26T08:00:00Z', num_repayments=1, repayment_interval=u'At end of term', bad_loan=0, gdp=667.4146118164062, xchange_rate=614.295166015625, status=u'paid', delinquent=None),
 Row(id=498729, activity=u'Agriculture', num_borrowers=1, male_proportion=0.0, len