mirror of
https://github.com/bspeice/metrik
synced 2025-07-03 06:45:07 -04:00
Initial Mongo refactoring work
This commit is contained in:
29
metrik/tasks/base.py
Normal file
29
metrik/tasks/base.py
Normal file
@ -0,0 +1,29 @@
|
||||
from __future__ import print_function
|
||||
from luigi import Task
|
||||
|
||||
from metrik.targets.mongo_target import MongoTarget
|
||||
|
||||
|
||||
class MongoCreateTask(Task):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(MongoCreateTask, self).__init__(*args, **kwargs)
|
||||
self.mongo_id = hash(str(self.to_str_params()))
|
||||
|
||||
def get_collection_name(self):
|
||||
raise NotImplementedError('Please set the collection name.')
|
||||
|
||||
def output(self):
|
||||
return MongoTarget(self.get_collection_name(), self.mongo_id)
|
||||
|
||||
def run(self):
|
||||
p_names = self.get_params()
|
||||
p_values = self.get_param_values(p_names, [], self.param_kwargs)
|
||||
print(p_values)
|
||||
data_dict = self.retrieve_data(**dict(p_values))
|
||||
data_dict['_id'] = self.mongo_id
|
||||
self.output().persist(data_dict)
|
||||
|
||||
@staticmethod
|
||||
def retrieve_data(self, *args, **kwargs):
|
||||
raise NotImplementedError('Get me some data!')
|
||||
|
@ -12,7 +12,7 @@ from io import StringIO
|
||||
from dateutil.parser import parse
|
||||
|
||||
from metrik.targets.mongo_target import MongoTarget
|
||||
|
||||
from metrik.tasks.base import MongoCreateTask
|
||||
|
||||
LiborRate = namedtuple('LiborRate', [
|
||||
'publication', 'overnight', 'one_week', 'one_month', 'two_month',
|
||||
@ -20,18 +20,13 @@ LiborRate = namedtuple('LiborRate', [
|
||||
])
|
||||
|
||||
|
||||
class LiborRateTask(Task):
|
||||
class LiborRateTask(MongoCreateTask):
|
||||
|
||||
date = DateParameter()
|
||||
currency = Parameter()
|
||||
|
||||
def output(self):
|
||||
h = hash(str(self.to_str_params()))
|
||||
return MongoTarget('libor', h)
|
||||
|
||||
def run(self):
|
||||
libor_record = self.retrieve_data(self.date, self.currency)
|
||||
self.output().persist(libor_record._asdict())
|
||||
def get_collection_name(self):
|
||||
return 'libor'
|
||||
|
||||
@staticmethod
|
||||
def retrieve_data(date, currency):
|
||||
@ -46,7 +41,9 @@ class LiborRateTask(Task):
|
||||
text = requests.get(url).text
|
||||
f = StringIO(text)
|
||||
next(f) # Skip the header
|
||||
record = {'currency': currency}
|
||||
|
||||
# TODO: Messing with globals() is probably a terrible idea, is there
|
||||
# a better way to write the below code?
|
||||
for row in csv.DictReader(f, fieldnames=fields):
|
||||
mapping = {
|
||||
'Overnight': 'overnight',
|
||||
@ -58,7 +55,8 @@ class LiborRateTask(Task):
|
||||
'1 Year': 'one_year'
|
||||
}
|
||||
if row['usd_ice_libor']:
|
||||
record[mapping[row['tenor']]] = float(row['usd_ice_libor'])
|
||||
globals()[mapping[row['tenor']]] = float(row['usd_ice_libor'])
|
||||
|
||||
if row['publication']:
|
||||
# Weird things happen with the publication field. For whatever reason,
|
||||
# the *time* is correct, but very often the date gets screwed up.
|
||||
@ -66,6 +64,19 @@ class LiborRateTask(Task):
|
||||
# download with `requests`, I see both date (often incorrect) and time.
|
||||
dt = parse(row['publication'])
|
||||
dt = dt.replace(year=date.year, month=date.month, day=date.day)
|
||||
record['publication'] = dt
|
||||
globals()['publication'] = dt
|
||||
|
||||
return LiborRate(**record)
|
||||
# Because of the shenanigans I did earlier with locals(), ignore
|
||||
# unresolved references. Probably a better way to do this.
|
||||
# noinspection PyUnresolvedReferences
|
||||
return {
|
||||
'currency': currency,
|
||||
'publication': publication,
|
||||
'overnight': overnight,
|
||||
'one_week': one_week,
|
||||
'one_month': one_month,
|
||||
'two_month': two_month,
|
||||
'three_month': three_month,
|
||||
'six_month': six_month,
|
||||
'one_year': one_year
|
||||
}
|
||||
|
Reference in New Issue
Block a user