From 98d669eab3c16ac9ee9e27fe0b63f6ba4c70797e Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Fri, 12 Aug 2016 20:18:12 -0400 Subject: [PATCH] Can now fetch LIBOR data correctly --- metrik/batch.py | 9 +++++++++ metrik/conf.py | 5 ++++- metrik/flows/__init__.py | 0 metrik/flows/libor_flow.py | 13 +++++++++++++ metrik/targets/mongo_target.py | 24 ++++++++++++++++++++++++ metrik/targets/pickle_target.py | 24 ++++++------------------ metrik/targets/temp_file.py | 15 +++++++++++++++ metrik/tasks/ice.py | 13 +++++++++++++ 8 files changed, 84 insertions(+), 19 deletions(-) create mode 100644 metrik/batch.py create mode 100644 metrik/flows/__init__.py create mode 100644 metrik/flows/libor_flow.py create mode 100644 metrik/targets/mongo_target.py create mode 100644 metrik/targets/temp_file.py diff --git a/metrik/batch.py b/metrik/batch.py new file mode 100644 index 0000000..e7e4ef4 --- /dev/null +++ b/metrik/batch.py @@ -0,0 +1,9 @@ +from luigi import build +from metrik.flows.libor_flow import LiborFlow +from datetime import datetime + + +if __name__ == '__main__': + l = LiborFlow(datetime(2016, 5, 9).date()) + + build([l], local_scheduler=True) \ No newline at end of file diff --git a/metrik/conf.py b/metrik/conf.py index 8f0583a..ae1d084 100644 --- a/metrik/conf.py +++ b/metrik/conf.py @@ -1 +1,4 @@ -USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:47.0) Gecko/20100101 Firefox/47.0" \ No newline at end of file +USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:47.0) Gecko/20100101 Firefox/47.0" +MONGO_HOST = 'localhost' +MONGO_PORT = 27017 +MONGO_DATABASE = 'metrik' \ No newline at end of file diff --git a/metrik/flows/__init__.py b/metrik/flows/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/metrik/flows/libor_flow.py b/metrik/flows/libor_flow.py new file mode 100644 index 0000000..3ef4e01 --- /dev/null +++ b/metrik/flows/libor_flow.py @@ -0,0 +1,13 @@ +from luigi import Task, DateParameter, LocalTarget + +from metrik.tasks.ice import LiborRateTask +from metrik.targets.temp_file import TempFileTarget + + +class LiborFlow(Task): + date = DateParameter() + + def requires(self): + currencies = ['USD'] + return [LiborRateTask(self.date, currency) + for currency in currencies] diff --git a/metrik/targets/mongo_target.py b/metrik/targets/mongo_target.py new file mode 100644 index 0000000..0387532 --- /dev/null +++ b/metrik/targets/mongo_target.py @@ -0,0 +1,24 @@ +from luigi import Target +from pymongo import MongoClient +from metrik.conf import MONGO_HOST, MONGO_PORT, MONGO_DATABASE + + +class MongoTarget(Target): + + def __init__(self, collection, id): + self.connection = MongoClient(MONGO_HOST, MONGO_PORT)[MONGO_DATABASE] + self.collection = self.connection[collection] + self.id = id + + def exists(self): + return self.collection.find_one({ + '_id': self.id + }) is not None + + def persist(self, dict_object): + id_dict = dict_object + id_dict['_id'] = self.id + return self.collection.insert_one(id_dict).inserted_id + + def retrieve(self): + return self.collection.find_one({'_id': self.id}) diff --git a/metrik/targets/pickle_target.py b/metrik/targets/pickle_target.py index 55eb57a..7453e7c 100644 --- a/metrik/targets/pickle_target.py +++ b/metrik/targets/pickle_target.py @@ -1,24 +1,12 @@ -from luigi.target import Target -from pickle import dump, dumps, load, loads -from os.path import exists, join -from os import unlink -from tempfile import tempdir -import base64 +from pickle import dump, load + +from metrik.targets.temp_file import TempFileTarget -class PickleTarget(Target): - def __init__(self, name): - self.name = name - - def full_path(self): - return join(tempdir, self.name) - - def exists(self): - return exists(self.full_path()) - - def write(self, object): +class PickleTarget(TempFileTarget): + def write(self, obj): with open(self.full_path(), 'w+b') as handle: - dump(object, handle) + dump(obj, handle) def read(self): return load(open(self.full_path(), 'rb')) diff --git a/metrik/targets/temp_file.py b/metrik/targets/temp_file.py new file mode 100644 index 0000000..fc444d7 --- /dev/null +++ b/metrik/targets/temp_file.py @@ -0,0 +1,15 @@ +from os.path import exists, join +from tempfile import tempdir + +from luigi.target import Target + + +class TempFileTarget(Target): + def __init__(self, name): + self.name = name + + def full_path(self): + return join(tempdir, self.name) + + def exists(self): + return exists(self.full_path()) \ No newline at end of file diff --git a/metrik/tasks/ice.py b/metrik/tasks/ice.py index c3b2369..52a57cd 100644 --- a/metrik/tasks/ice.py +++ b/metrik/tasks/ice.py @@ -1,4 +1,5 @@ from luigi.task import Task +from luigi.parameter import DateParameter, Parameter # noinspection PyUnresolvedReferences from six.moves.urllib.parse import quote_plus import pytz @@ -9,6 +10,8 @@ import csv from io import StringIO from dateutil.parser import parse +from metrik.targets.mongo_target import MongoTarget + LiborRate = namedtuple('LiborRate', [ 'publication', 'overnight', 'one_week', 'one_month', 'two_month', @@ -18,6 +21,16 @@ LiborRate = namedtuple('LiborRate', [ class LiborRateTask(Task): + date = DateParameter() + currency = Parameter() + + def output(self): + return MongoTarget('libor', hash(self.task_id)) + + def run(self): + libor_record = self.retrieve_data(self.date, self.currency) + self.output().persist(libor_record._asdict()) + @staticmethod def retrieve_data(date, currency): url = ('https://www.theice.com/marketdata/reports/icebenchmarkadmin/'