Can now fetch LIBOR data correctly

master
Bradlee Speice 2016-08-12 20:18:12 -04:00
parent 14e0b404dd
commit 98d669eab3
8 changed files with 84 additions and 19 deletions

9
metrik/batch.py Normal file
View File

@ -0,0 +1,9 @@
from luigi import build
from metrik.flows.libor_flow import LiborFlow
from datetime import datetime
if __name__ == '__main__':
l = LiborFlow(datetime(2016, 5, 9).date())
build([l], local_scheduler=True)

View File

@ -1 +1,4 @@
USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:47.0) Gecko/20100101 Firefox/47.0" USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:47.0) Gecko/20100101 Firefox/47.0"
MONGO_HOST = 'localhost'
MONGO_PORT = 27017
MONGO_DATABASE = 'metrik'

0
metrik/flows/__init__.py Normal file
View File

View File

@ -0,0 +1,13 @@
from luigi import Task, DateParameter, LocalTarget
from metrik.tasks.ice import LiborRateTask
from metrik.targets.temp_file import TempFileTarget
class LiborFlow(Task):
date = DateParameter()
def requires(self):
currencies = ['USD']
return [LiborRateTask(self.date, currency)
for currency in currencies]

View File

@ -0,0 +1,24 @@
from luigi import Target
from pymongo import MongoClient
from metrik.conf import MONGO_HOST, MONGO_PORT, MONGO_DATABASE
class MongoTarget(Target):
def __init__(self, collection, id):
self.connection = MongoClient(MONGO_HOST, MONGO_PORT)[MONGO_DATABASE]
self.collection = self.connection[collection]
self.id = id
def exists(self):
return self.collection.find_one({
'_id': self.id
}) is not None
def persist(self, dict_object):
id_dict = dict_object
id_dict['_id'] = self.id
return self.collection.insert_one(id_dict).inserted_id
def retrieve(self):
return self.collection.find_one({'_id': self.id})

View File

@ -1,24 +1,12 @@
from luigi.target import Target from pickle import dump, load
from pickle import dump, dumps, load, loads
from os.path import exists, join from metrik.targets.temp_file import TempFileTarget
from os import unlink
from tempfile import tempdir
import base64
class PickleTarget(Target): class PickleTarget(TempFileTarget):
def __init__(self, name): def write(self, obj):
self.name = name
def full_path(self):
return join(tempdir, self.name)
def exists(self):
return exists(self.full_path())
def write(self, object):
with open(self.full_path(), 'w+b') as handle: with open(self.full_path(), 'w+b') as handle:
dump(object, handle) dump(obj, handle)
def read(self): def read(self):
return load(open(self.full_path(), 'rb')) return load(open(self.full_path(), 'rb'))

View File

@ -0,0 +1,15 @@
from os.path import exists, join
from tempfile import tempdir
from luigi.target import Target
class TempFileTarget(Target):
def __init__(self, name):
self.name = name
def full_path(self):
return join(tempdir, self.name)
def exists(self):
return exists(self.full_path())

View File

@ -1,4 +1,5 @@
from luigi.task import Task from luigi.task import Task
from luigi.parameter import DateParameter, Parameter
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
from six.moves.urllib.parse import quote_plus from six.moves.urllib.parse import quote_plus
import pytz import pytz
@ -9,6 +10,8 @@ import csv
from io import StringIO from io import StringIO
from dateutil.parser import parse from dateutil.parser import parse
from metrik.targets.mongo_target import MongoTarget
LiborRate = namedtuple('LiborRate', [ LiborRate = namedtuple('LiborRate', [
'publication', 'overnight', 'one_week', 'one_month', 'two_month', 'publication', 'overnight', 'one_week', 'one_month', 'two_month',
@ -18,6 +21,16 @@ LiborRate = namedtuple('LiborRate', [
class LiborRateTask(Task): class LiborRateTask(Task):
date = DateParameter()
currency = Parameter()
def output(self):
return MongoTarget('libor', hash(self.task_id))
def run(self):
libor_record = self.retrieve_data(self.date, self.currency)
self.output().persist(libor_record._asdict())
@staticmethod @staticmethod
def retrieve_data(date, currency): def retrieve_data(date, currency):
url = ('https://www.theice.com/marketdata/reports/icebenchmarkadmin/' url = ('https://www.theice.com/marketdata/reports/icebenchmarkadmin/'