In [1]:
sparkSql = (SparkSession.builder
 .master("local")
 .appName("Kiva Exploration")
 .getOrCreate())

loans = sparkSql.read.format('json').load('kiva-data/loans.json')
loans.registerTempTable('loans')
lenders = sparkSql.read.format('json').load('kiva-data/lenders.json')
lenders.registerTempTable('lenders')
loans_lenders = sparkSql.read.format('json').load('kiva-data/loans_lenders.json')
loans_lenders.registerTempTable('loans_lenders')

In [2]:
loans.printSchema()

root
 |-- activity: string (nullable = true)
 |-- basket_amount: long (nullable = true)
 |-- bonus_credit_eligibility: boolean (nullable = true)
 |-- borrowers: array (nullable = true)
 | |-- element: struct (containsNull = true)
 | | |-- first_name: string (nullable = true)
 | | |-- gender: string (nullable = true)
 | | |-- last_name: string (nullable = true)
 | | |-- pictured: boolean (nullable = true)
 |-- currency_exchange_loss_amount: double (nullable = true)
 |-- delinquent: boolean (nullable = true)
 |-- description: struct (nullable = true)
 | |-- languages: array (nullable = true)
 | | |-- element: string (containsNull = true)
 | |-- texts: struct (nullable = true)
 | | |-- ar: string (nullable = true)
 | | |-- en: string (nullable = true)
 | | |-- es: string (nullable = true)
 | | |-- fr: string (nullable = true)
 | | |-- id: string (nullable = true)
 | | |-- mn: string (nullable = true)
 | | |-- pt: string (nullable = true)
 | | |-- ru: string (nullable = true)
 | | |-- vi: 

In [43]:
import pyspark

def male_proportion(array):
 num_males = 0
 for item in array:
 if item.gender == 'M':
 num_males += 1
 
 return float(num_males) / len(array)

sparkSql.udf.register('male_proportion',
 male_proportion,
 pyspark.sql.types.FloatType())

train, validation, test = loans.randomSplit([.6, .2, .2], 101)

query = '''
SELECT
 id,
 activity,
 size(borrowers) as num_borrowers,
 male_proportion(borrowers) as male_proportion,
 lender_count,
 location.country,
 location.country_code,
 partner_id,
 sector,
 tags,
 DATEDIFF(terms.disbursal_date, planned_expiration_date) as loan_length,
 terms.disbursal_amount,
 terms.disbursal_currency,
 terms.disbursal_date,
 size(terms.scheduled_payments) as num_repayments,
 terms.repayment_interval,
 CASE WHEN
 (status = 'refunded') OR
 (status = 'defaulted') OR
 (status = 'deleted') OR
 (status = 'issue') OR
 (status = 'inactive_expired') OR
 (status = 'expired') OR
 (status = 'inactive') OR
 (delinquent = True) THEN 1 ELSE 0 END AS bad_loan,
 status,
 delinquent
 
FROM {}
WHERE
 status != 'fundraising' AND
 status != 'funded'
'''

train.registerTempTable('loans_train')
validation.registerTempTable('loans_validation')
test.registerTempTable('loans_test')

sparkSql.sql(query.format('loans_validation')).write.json('validation_data-filtered.json')