diff --git a/metrik/flows/base.py b/metrik/flows/base.py index c2fae23..2cc6e74 100644 --- a/metrik/flows/base.py +++ b/metrik/flows/base.py @@ -1,28 +1,35 @@ -from luigi.task import WrapperTask +from luigi.task import Task from luigi.parameter import DateMinuteParameter, BoolParameter import pandas as pd from metrik.trading_days import is_trading_day -class Flow(WrapperTask): +class Flow(Task): present = DateMinuteParameter() live = BoolParameter() def __init__(self, force=False, *args, **kwargs): super(Flow, self).__init__(*args, **kwargs) self.force = force + self.did_run = False + + def complete(self): + return self.did_run @staticmethod def get_schedule(): raise NotImplementedError('Your flow should know when it should be run.') - def _requires(self): + def _run(self): raise NotImplementedError('I need to know what tasks should be run!') - def requires(self): + def run(self): if self.force or self.get_schedule().check_trigger(self.present): - return self._requires() + for yielded in self._run(): + yield yielded + + self.did_run = True class Schedule(object): diff --git a/metrik/flows/equities_flow.py b/metrik/flows/equities_flow.py index 6c3d188..fa4c501 100644 --- a/metrik/flows/equities_flow.py +++ b/metrik/flows/equities_flow.py @@ -1,13 +1,34 @@ from metrik.flows.base import Flow, MarketClose from metrik.tasks.nasdaq import NasdaqETFList, NasdaqCompanyList +from metrik.tasks.tradeking import Tradeking1mTimesales class EquitiesFlow(Flow): + def __init__(self, *args, **kwargs): + super(EquitiesFlow, self).__init__(*args, **kwargs) + @staticmethod def get_schedule(): return MarketClose() - def _requires(self): - return [NasdaqETFList(current_datetime=self.present, live=self.live), - NasdaqCompanyList(current_datetime=self.present, - live=self.live)] \ No newline at end of file + def _run(self): + # When we yield dependencies in `run` instead of `_requires`, + # they get executed dynamically and we can use their results inline + etfs = NasdaqETFList(current_datetime=self.present, live=self.live) + companies = NasdaqCompanyList(current_datetime=self.present, + live=self.live) + + yield etfs + yield companies + + tradeking_etfs = [Tradeking1mTimesales( + present=self.present.date(), + symbol=e['Symbol'] + ) for e in etfs.output().retrieve()['etfs']] + yield tradeking_etfs + + tradeking_companies = [Tradeking1mTimesales( + present=self.present.date(), + symbol=c['Symbol'] + ) for c in companies.output().retrieve()['companies']] + yield tradeking_companies diff --git a/metrik/tasks/base.py b/metrik/tasks/base.py index 2d6890c..b73a117 100644 --- a/metrik/tasks/base.py +++ b/metrik/tasks/base.py @@ -91,9 +91,8 @@ class MongoRateLimit(object): '_created_at': present, 'service': service }) - def sleep_until(self, present, interval, backoff): - future_time = present + interval * backoff - return (future_time - present).total_seconds() + def sleep_for(self, interval, backoff): + return interval.total_seconds() * backoff def acquire_lock(self, service, limit, interval, max_tries=5, backoff=.5): num_tries = 0 @@ -109,13 +108,7 @@ class MongoRateLimit(object): self.save_lock(self.get_present(), service) return True elif num_tries < max_tries: - sleep_amount = self.sleep_until( - self.get_present(), - interval, - backoff - ) + sleep_amount = self.sleep_for(interval, backoff) sleep(sleep_amount) return False - - diff --git a/metrik/tasks/tradeking.py b/metrik/tasks/tradeking.py index c3044d5..b0c4456 100644 --- a/metrik/tasks/tradeking.py +++ b/metrik/tasks/tradeking.py @@ -35,10 +35,8 @@ class Tradeking1mTimesales(MongoCreateTask): present = DateParameter() symbol = Parameter() - def acquire_lock(self, service, limit, interval, max_tries=5, backoff=.5): - return super(Tradeking1mTimesales, self).acquire_lock( - 'tradeking', 60, timedelta(minutes=1) - ) + def get_collection_name(self): + return 'tradeking_1min' @staticmethod def retrieve_data(present, symbol): diff --git a/test/tasks/test_base.py b/test/tasks/test_base.py index 4a69160..5054ca1 100644 --- a/test/tasks/test_base.py +++ b/test/tasks/test_base.py @@ -92,4 +92,39 @@ class RateLimitTest(MongoTest): # Check that we acquired the lock assert did_acquire # Check that we only used one backoff period - assert (end - start).total_seconds() < 2 + total_seconds = (end - start).total_seconds() + assert 1 < total_seconds < 2 + + def test_acquire_lock_succeeds_float(self): + # And do the whole thing all over again, but with a floating backoff + service = 'testing_ratelimit' + limit = 1 + interval = timedelta(seconds=1) + max_tries = 2 + backoff = 1.01 + + # The first scenario is as follows: + # We try to acquire a lock with two tries, backoff is 1. + # We put a single lock in initially (a half second in the past), + # thus when we try to acquire on the first try, we should fail. + # However, the backoff should kick in, and we acquire successfully + # on the second try + ratelimit = MongoRateLimit() + + start = datetime.now() + ratelimit.save_lock(start - timedelta(seconds=.5), service) + did_acquire = ratelimit.acquire_lock(service, limit, interval, + max_tries, backoff) + end = datetime.now() + # Check that we acquired the lock + assert did_acquire + # Check that we only used one backoff period + total_seconds = (end - start).total_seconds() + assert 1 < total_seconds < 2 + + def test_sleep_for_gives_correct_time(self): + ratelimit = MongoRateLimit() + + assert ratelimit.sleep_for(timedelta(seconds=1), 1) == 1 + assert ratelimit.sleep_for(timedelta(seconds=1), 2) == 2 + assert ratelimit.sleep_for(timedelta(seconds=1), 1.1) == 1.1