diff --git a/metrik/__init__.py b/metrik/__init__.py index 59b1681..c3c34cf 100644 --- a/metrik/__init__.py +++ b/metrik/__init__.py @@ -1,2 +1,2 @@ -__version__ = '0.1.2' +__version__ = '0.2.0' __release__ = __version__ \ No newline at end of file diff --git a/metrik/batch.py b/metrik/batch.py index 6538099..6f7491c 100644 --- a/metrik/batch.py +++ b/metrik/batch.py @@ -11,8 +11,8 @@ flows = { } -def run_flow(flow_class, present): - build([flow_class(present=present)]) +def run_flow(flow_class, present, live): + build([flow_class(present=present, live=live)]) def build_cron_file(): @@ -30,10 +30,11 @@ def build_cron_file(): def list_flows(): - return "Avaiable:\n\t" + "\n\t".join(flows.keys()) + return "Available:\n\t" + "\n\t".join(flows.keys()) def handle_commandline(): + present_moment = datetime.now() parser = ArgumentParser(description='Capture ALL THE DATA off the Internet.') parser.add_argument('-c', '--cron', dest='cron', action='store_true', help='Build the cron file used to schedule' @@ -41,7 +42,7 @@ def handle_commandline(): parser.add_argument('-d', '--date', dest='present', help='Run a flow as if it was this time ' '(default: %(default)s).', - default=datetime.now()) + default=present_moment) parser.add_argument('-f', '--flow', dest='flow', help='The flow to be run') parser.add_argument('-l', '--list-flows', dest='list', action='store_true', help='List all available flows to be run.') @@ -53,9 +54,9 @@ def handle_commandline(): print(list_flows()) elif args.flow: if type(args.present) is datetime: - run_flow(flows[args.flow], args.present) + run_flow(flows[args.flow], args.present, True) else: - run_flow(flows[args.flow], parse(args.present)) + run_flow(flows[args.flow], parse(args.present), False) else: print("No actions specified, exiting.") diff --git a/metrik/flows/base.py b/metrik/flows/base.py index 0d42172..c2fae23 100644 --- a/metrik/flows/base.py +++ b/metrik/flows/base.py @@ -1,5 +1,5 @@ from luigi.task import WrapperTask -from luigi.parameter import DateMinuteParameter +from luigi.parameter import DateMinuteParameter, BoolParameter import pandas as pd from metrik.trading_days import is_trading_day @@ -7,6 +7,7 @@ from metrik.trading_days import is_trading_day class Flow(WrapperTask): present = DateMinuteParameter() + live = BoolParameter() def __init__(self, force=False, *args, **kwargs): super(Flow, self).__init__(*args, **kwargs) diff --git a/metrik/flows/equities_flow.py b/metrik/flows/equities_flow.py new file mode 100644 index 0000000..a55e9f9 --- /dev/null +++ b/metrik/flows/equities_flow.py @@ -0,0 +1,13 @@ +from metrik.flows.base import Flow, MarketClose +from metrik.tasks.nasdaq import NasdaqETFList, NasdaqCompanyList + + +class EquitiesFlow(Flow): + @staticmethod + def get_schedule(): + MarketClose() + + def _requires(self): + return [NasdaqETFList(current_datetime=self.present, live=self.live), + NasdaqCompanyList(current_datetime=self.present, + live=self.live)] \ No newline at end of file