gdp-currency-UDFs

UDFs for GDP and currency exchange rates lookup
pull/1/head
karlloic 2016-11-06 22:32:45 -05:00
parent e7d0a98bc5
commit 7d6ce67194
1 changed files with 263 additions and 3 deletions

View File

@ -136,7 +136,7 @@
},
{
"cell_type": "code",
"execution_count": 43,
"execution_count": 12,
"metadata": {
"collapsed": false
},
@ -185,6 +185,8 @@
" (status = 'expired') OR\n",
" (status = 'inactive') OR\n",
" (delinquent = True) THEN 1 ELSE 0 END AS bad_loan,\n",
" gdp(location.country_code, terms.disbursal_date) as gdp,\n",
" xchange_rate(location.country_code, terms.disbursal_date) as xchange_rate,\n",
" status,\n",
" delinquent\n",
" \n",
@ -200,6 +202,264 @@
"\n",
"sparkSql.sql(query.format('loans_validation')).write.json('validation_data-filtered.json')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Fetch GDP"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import pandas as pd\n",
"from datetime import datetime\n",
"import numpy as np\n",
"\n",
"\n",
"# Load country info data\n",
"country_codes_raw = pd.read_csv('economic-data/country-codes.csv')\n",
"country_gdp_raw = pd.read_csv('economic-data/country-gdp.csv')"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Clean country codes data\n",
"country_codes = country_codes_raw[['official_name_en', 'ISO3166-1-Alpha-2', \n",
" 'ISO3166-1-Alpha-3', 'ISO4217-currency_alphabetic_code']]\n",
"\n",
"# Clean gdp data\n",
"country_gdp = country_gdp_raw.drop(country_gdp_raw.columns[[0, 1]], axis=1)\n",
"country_gdp.columns = ['name', 'country_code_3', '2002', '2003', '2004', '2005', '2006',\n",
" '2007', '2008', '2009', '2010', '2011', '2012', '2013', '2014', '2015', '2016']"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Merde gdp and code\n",
"country_gdp = pd.merge(country_gdp, country_codes, left_on='country_code_3', right_on='ISO3166-1-Alpha-3', how='left')\n",
"country_gdp.drop(['official_name_en', 'ISO3166-1-Alpha-3', 'country_code_3'], axis=1, inplace=True)\n",
"country_gdp = country_gdp.rename(columns = {'ISO3166-1-Alpha-2':'country_code',\n",
" 'ISO4217-currency_alphabetic_code':'currency_code'})\n",
"country_gdp.replace('..', np.nan, inplace=True)\n",
"\n",
"# Reorder columns\n",
"cols = list(country_gdp.columns)\n",
"cols.insert(1, cols.pop(cols.index('country_code')))\n",
"cols.insert(2, cols.pop(cols.index('currency_code')))\n",
"country_gdp = country_gdp.reindex(columns= cols)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def gdp(country_code, disbursal_date):\n",
" def historical_gdp(array):\n",
" array = np.array(map(float, array))\n",
" array = array[~np.isnan(array)] # Remove NaN\n",
" if len(array) == 0: # No GDP values\n",
" return 0\n",
" return float(np.mean(array, dtype=np.float64))\n",
" \n",
" # TODO: Unable to resolve country code WorldBank dataset has wrong alpha 3 codes e.g. Andorra causing issues\n",
" try:\n",
" float(country_code)\n",
" return 0\n",
" except:\n",
" if country_code not in list(country_gdp['country_code']):\n",
" return 0 # TODO: Bad solution ? \n",
" \n",
" # Get the historical average GDP if no disbursal date\n",
" all_gdp = country_gdp[country_gdp.country_code == country_code].values[0][3:]\n",
" if (disbursal_date is None): # or (country_gdp[date][country_gdp.country_code == country_code] == float('Nan')):\n",
" return historical_gdp(all_gdp)\n",
" \n",
" date = str(datetime.strptime(disbursal_date, '%Y-%m-%dT%H:%M:%SZ').year)\n",
" # Get the historical average GDP if no GDP for that year\n",
" if pd.isnull(country_gdp[date][country_gdp.country_code == country_code].values[0]):\n",
" return historical_gdp(all_gdp)\n",
" \n",
" return float(country_gdp[date][country_gdp.country_code == country_code].values[0])\n",
"\n",
"sparkSql.udf.register('gdp', gdp, pyspark.sql.types.FloatType())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Fetch Exchange Rates"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"currencies_raw = pd.read_csv('economic-data/currencies.csv')"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Cleanup\n",
"currencies = currencies_raw.drop(country_gdp_raw.columns[[0, 1]], axis=1)\n",
"currencies.columns = ['country_name', 'country_code_3', '2002', '2003', '2004', '2005', '2006',\n",
" '2007', '2008', '2009', '2010', '2011', '2012', '2013', '2014', '2015', '2016']"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Get ISO 2 code\n",
"currencies = pd.merge(currencies, country_codes, left_on='country_code_3', right_on='ISO3166-1-Alpha-3', how='left')\n",
"currencies.drop(['official_name_en', 'ISO3166-1-Alpha-3', 'country_code_3'], axis=1, inplace=True)\n",
"currencies = currencies.rename(columns = {'ISO3166-1-Alpha-2':'country_code',\n",
" 'ISO4217-currency_alphabetic_code':'currency_code'})\n",
"currencies.replace('..', np.nan, inplace=True)\n",
"\n",
"# Add code for European Union\n",
"currencies.set_value(217, 'country_code', 'EU')\n",
"currencies.set_value(217, 'currency_code', 'EMU')\n",
"\n",
"# Reorder columns\n",
"cols = list(currencies.columns)\n",
"cols.insert(1, cols.pop(cols.index('country_code')))\n",
"cols.insert(2, cols.pop(cols.index('currency_code')))\n",
"currencies = currencies.reindex(columns=cols)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def xchange_rate(country_code, disbursal_date):\n",
" def historical_rates(array):\n",
" array = np.array(map(float, array))\n",
" array = array[~np.isnan(array)] # Remove NaN\n",
" if len(array) == 0: # No rate values\n",
" return 1\n",
" return float(np.mean(array, dtype=np.float64))\n",
" \n",
" eu = ['AT','BE','BG','HR','CY','CZ','DK','EE','FI','FR','DE','GR','HU','IE',\n",
" 'IT','LV','LT','LU','MT','NL','PL','PT','RO','SK','SI','ES','SE','GB']\n",
" us = ['AS','GU','MP','PR','UM','VI']\n",
" try:\n",
" float(country_code) # Country code unknown?\n",
" if pd.isnull(country_code):\n",
" return 1 # TODO: Bad solution ??\n",
" except:\n",
" if country_code in eu:\n",
" country_code = 'EU'\n",
" elif country_code in us:\n",
" country_code = 'US'\n",
" if country_code not in list(currencies['country_code']):\n",
" return 1\n",
" \n",
" \n",
" # TODO: Unable to resolve country code WorldBank dataset has wrong alpha 3 codes e.g. Andorra causing\n",
" try:\n",
" float(country_code)\n",
" return 0\n",
" except:\n",
" if country_code not in list(currencies['country_code']):\n",
" return 0 # TODO: Bad solution \n",
" \n",
" # Get the historical average exchange rate if no disbursal date\n",
" all_rates = currencies[currencies.country_code == country_code].values[0][3:]\n",
" if (disbursal_date is None): # or (country_gdp[date][country_gdp.country_code == country_code] == float('Nan')):\n",
" return historical_rates(all_rates)\n",
" \n",
" date = str(datetime.strptime(disbursal_date, '%Y-%m-%dT%H:%M:%SZ').year)\n",
" # Get the historical average exchange rate if no GDP for that year\n",
" if pd.isnull(currencies[date][currencies.country_code == country_code].values[0]):\n",
" return historical_rates(all_rates)\n",
" \n",
" return float(currencies[date][currencies.country_code == country_code].values[0])\n",
"\n",
"sparkSql.udf.register('xchange_rate', xchange_rate, pyspark.sql.types.FloatType())"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[Row(id=507280, activity=u'Agriculture', num_borrowers=10, male_proportion=0.10000000149011612, lender_count=91, country=u'Rwanda', country_code=u'RW', partner_id=170, sector=u'Agriculture', tags=[], loan_length=-59, disbursal_amount=1500000.0, disbursal_currency=u'RWF', disbursal_date=u'2012-11-15T08:00:00Z', num_repayments=1, repayment_interval=u'At end of term', bad_loan=0, gdp=667.4146118164062, xchange_rate=614.295166015625, status=u'paid', delinquent=None),\n",
" Row(id=508860, activity=u'Agriculture', num_borrowers=1, male_proportion=1.0, lender_count=28, country=u'Rwanda', country_code=u'RW', partner_id=170, sector=u'Agriculture', tags=[], loan_length=-52, disbursal_amount=500000.0, disbursal_currency=u'RWF', disbursal_date=u'2012-11-26T08:00:00Z', num_repayments=1, repayment_interval=u'At end of term', bad_loan=0, gdp=667.4146118164062, xchange_rate=614.295166015625, status=u'paid', delinquent=None),\n",
" Row(id=498729, activity=u'Agriculture', num_borrowers=1, male_proportion=0.0, lender_count=6, country=u'Kenya', country_code=u'KE', partner_id=133, sector=u'Agriculture', tags=[], loan_length=-38, disbursal_amount=20000.0, disbursal_currency=u'KES', disbursal_date=u'2012-11-13T08:00:00Z', num_repayments=12, repayment_interval=u'Monthly', bad_loan=0, gdp=1184.9232177734375, xchange_rate=84.52960205078125, status=u'paid', delinquent=None),\n",
" Row(id=501877, activity=u'Agriculture', num_borrowers=1, male_proportion=1.0, lender_count=14, country=u'Peru', country_code=u'PE', partner_id=71, sector=u'Agriculture', tags=[], loan_length=-39, disbursal_amount=1000.0, disbursal_currency=u'PEN', disbursal_date=u'2012-11-20T08:00:00Z', num_repayments=8, repayment_interval=u'Monthly', bad_loan=0, gdp=6389.63037109375, xchange_rate=2.6375863552093506, status=u'paid', delinquent=None),\n",
" Row(id=504386, activity=u'Agriculture', num_borrowers=1, male_proportion=1.0, lender_count=16, country=u'Benin', country_code=u'BJ', partner_id=104, sector=u'Agriculture', tags=[], loan_length=-58, disbursal_amount=190000.0, disbursal_currency=u'XOF', disbursal_date=u'2012-11-08T08:00:00Z', num_repayments=4, repayment_interval=u'Irregularly', bad_loan=0, gdp=807.6884765625, xchange_rate=510.5271301269531, status=u'paid', delinquent=None),\n",
" Row(id=510144, activity=u'Agriculture', num_borrowers=1, male_proportion=1.0, lender_count=7, country=u'Senegal', country_code=u'SN', partner_id=108, sector=u'Agriculture', tags=[], loan_length=-53, disbursal_amount=150000.0, disbursal_currency=u'XOF', disbursal_date=u'2012-11-27T08:00:00Z', num_repayments=12, repayment_interval=u'Monthly', bad_loan=0, gdp=1019.272216796875, xchange_rate=510.5271301269531, status=u'paid', delinquent=None),\n",
" Row(id=497262, activity=u'Agriculture', num_borrowers=1, male_proportion=0.0, lender_count=11, country=u'Nicaragua', country_code=u'NI', partner_id=74, sector=u'Agriculture', tags=[], loan_length=-35, disbursal_amount=7000.0, disbursal_currency=u'NIO', disbursal_date=u'2012-11-14T08:00:00Z', num_repayments=1, repayment_interval=u'At end of term', bad_loan=0, gdp=1776.209228515625, xchange_rate=23.546663284301758, status=u'paid', delinquent=None),\n",
" Row(id=503327, activity=u'Agriculture', num_borrowers=1, male_proportion=0.0, lender_count=7, country=u'Mexico', country_code=u'MX', partner_id=224, sector=u'Agriculture', tags=[], loan_length=-7, disbursal_amount=3000.0, disbursal_currency=u'MXN', disbursal_date=u'2012-12-28T08:00:00Z', num_repayments=1, repayment_interval=u'At end of term', bad_loan=0, gdp=9720.5615234375, xchange_rate=13.169458389282227, status=u'paid', delinquent=None),\n",
" Row(id=500119, activity=u'Agriculture', num_borrowers=1, male_proportion=0.0, lender_count=30, country=u'Mexico', country_code=u'MX', partner_id=224, sector=u'Agriculture', tags=[], loan_length=6, disbursal_amount=12000.0, disbursal_currency=u'MXN', disbursal_date=u'2012-12-28T08:00:00Z', num_repayments=1, repayment_interval=u'At end of term', bad_loan=0, gdp=9720.5615234375, xchange_rate=13.169458389282227, status=u'paid', delinquent=None),\n",
" Row(id=153403, activity=u'Agriculture', num_borrowers=1, male_proportion=0.0, lender_count=37, country=u'Togo', country_code=u'TG', partner_id=22, sector=u'Agriculture', tags=[], loan_length=None, disbursal_amount=450000.0, disbursal_currency=u'XOF', disbursal_date=u'2009-10-26T07:00:00Z', num_repayments=14, repayment_interval=u'Irregularly', bad_loan=1, gdp=508.54052734375, xchange_rate=472.186279296875, status=u'defaulted', delinquent=True)]"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# sparkSql.sql(query.format('loans_validation')).take(10)\n",
"sparkSql.sql(query.format('loans_validation')).write.json('validation_data-filtered.json')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
@ -218,9 +478,9 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
"version": "2.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 1
"nbformat_minor": 0
}