mirror of
https://github.com/bspeice/metrik
synced 2024-11-23 07:38:09 -05:00
Switch the configuration system
I like this one much better.
This commit is contained in:
parent
7e81f78a6d
commit
ce71c4bc94
@ -3,7 +3,9 @@ from luigi import build
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from argparse import ArgumentParser
|
from argparse import ArgumentParser
|
||||||
from dateutil.parser import parse
|
from dateutil.parser import parse
|
||||||
|
from six import StringIO
|
||||||
|
|
||||||
|
from metrik.conf import get_config
|
||||||
from metrik.flows.rates_flow import LiborFlow
|
from metrik.flows.rates_flow import LiborFlow
|
||||||
from metrik.flows.equities_flow import EquitiesFlow
|
from metrik.flows.equities_flow import EquitiesFlow
|
||||||
from metrik import __version__
|
from metrik import __version__
|
||||||
@ -49,6 +51,7 @@ def handle_commandline():
|
|||||||
parser.add_argument('-f', '--flow', dest='flow', help='The flow to be run')
|
parser.add_argument('-f', '--flow', dest='flow', help='The flow to be run')
|
||||||
parser.add_argument('-l', '--list-flows', dest='list', action='store_true',
|
parser.add_argument('-l', '--list-flows', dest='list', action='store_true',
|
||||||
help='List all available flows to be run.')
|
help='List all available flows to be run.')
|
||||||
|
parser.add_argument('-o', '--config', action='store_true')
|
||||||
parser.add_argument('-v', '--version', action='version',
|
parser.add_argument('-v', '--version', action='version',
|
||||||
version=__version__)
|
version=__version__)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
@ -57,6 +60,11 @@ def handle_commandline():
|
|||||||
print(build_cron_file())
|
print(build_cron_file())
|
||||||
elif args.list:
|
elif args.list:
|
||||||
print(list_flows())
|
print(list_flows())
|
||||||
|
elif args.config:
|
||||||
|
config = get_config()
|
||||||
|
s = StringIO
|
||||||
|
config.write(s)
|
||||||
|
print(s.getvalue())
|
||||||
elif args.flow:
|
elif args.flow:
|
||||||
if type(args.present) is datetime:
|
if type(args.present) is datetime:
|
||||||
run_flow(flows[args.flow], args.present, True)
|
run_flow(flows[args.flow], args.present, True)
|
||||||
|
@ -1,16 +1,35 @@
|
|||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
# from six.moves.configparser import RawConfigParser
|
||||||
|
from ConfigParser import RawConfigParser
|
||||||
|
|
||||||
IS_TRAVIS = 'TRAVIS_BUILD_NUMBER' in os.environ
|
|
||||||
IS_PYTEST = hasattr(sys, '_called_from_test')
|
|
||||||
|
|
||||||
TEST = IS_PYTEST or IS_TRAVIS
|
def get_config_locations():
|
||||||
|
return ['/etc/metrik', os.path.expanduser('~/.metrik')]
|
||||||
|
|
||||||
USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:47.0) Gecko/20100101 Firefox/47.0"
|
|
||||||
MONGO_HOST = 'localhost'
|
|
||||||
MONGO_PORT = 27017
|
|
||||||
|
|
||||||
if TEST:
|
def get_default_conf():
|
||||||
MONGO_DATABASE = 'metrik'
|
cur_file_dir = os.path.dirname(os.path.realpath(__file__))
|
||||||
else:
|
return open(cur_file_dir + '/default.conf', 'r')
|
||||||
MONGO_DATABASE = 'metrik-test'
|
|
||||||
|
|
||||||
|
# Normally it's terrible practice to put static calls into the signature,
|
||||||
|
# but this is safe (for now) since the get_config_locations() won't change
|
||||||
|
# during a run. I.e. it starts up and that's the only time it's ever needed.
|
||||||
|
def get_config(extra_locations=get_config_locations()):
|
||||||
|
config = RawConfigParser()
|
||||||
|
|
||||||
|
config.readfp(get_default_conf())
|
||||||
|
|
||||||
|
conf_files = config.read(extra_locations)
|
||||||
|
for conf_file in conf_files:
|
||||||
|
config.readfp(open(conf_file, 'r'))
|
||||||
|
|
||||||
|
# Not a huge fan of when developers think they're smarter than the
|
||||||
|
# end-users, but I'm calling it a special case for testing
|
||||||
|
is_travis = 'TRAVIS_BUILD_NUMBER' in os.environ
|
||||||
|
is_pytest = hasattr(sys, '_called_from_test')
|
||||||
|
if is_pytest or is_travis:
|
||||||
|
config.set('metrik', 'mongo_database', 'metrik-test')
|
||||||
|
|
||||||
|
return config
|
||||||
|
11
metrik/default.conf
Normal file
11
metrik/default.conf
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
[metrik]
|
||||||
|
user_agent=Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:47.0) Gecko/20100101 Firefox/47.0
|
||||||
|
mongo_host=localhost
|
||||||
|
mongo_port=27017
|
||||||
|
mongo_database=metrik
|
||||||
|
|
||||||
|
[tradeking]
|
||||||
|
consumer_key=
|
||||||
|
consumer_secret=
|
||||||
|
oauth_token=
|
||||||
|
oauth_token_secret=
|
@ -1,13 +1,18 @@
|
|||||||
from luigi import Target
|
from luigi import Target
|
||||||
from pymongo import MongoClient
|
from pymongo import MongoClient
|
||||||
from metrik.conf import MONGO_HOST, MONGO_PORT, MONGO_DATABASE
|
from metrik.conf import get_config
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
||||||
class MongoTarget(Target):
|
class MongoTarget(Target):
|
||||||
|
|
||||||
def __init__(self, collection, id):
|
def __init__(self, collection, id):
|
||||||
self.connection = MongoClient(MONGO_HOST, MONGO_PORT)[MONGO_DATABASE]
|
config = get_config()
|
||||||
|
self.connection = MongoClient(
|
||||||
|
host=config.get('metrik', 'mongo_host'),
|
||||||
|
port=config.get('metrik', 'mongo_port'))[
|
||||||
|
config.get('metrik', 'mongo_database')
|
||||||
|
]
|
||||||
self.collection = self.connection[collection]
|
self.collection = self.connection[collection]
|
||||||
self.id = id
|
self.id = id
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@ from pymongo import MongoClient
|
|||||||
|
|
||||||
from metrik.targets.mongo import MongoTarget
|
from metrik.targets.mongo import MongoTarget
|
||||||
from metrik.targets.noop import NoOpTarget
|
from metrik.targets.noop import NoOpTarget
|
||||||
from metrik.conf import MONGO_HOST, MONGO_PORT, MONGO_DATABASE
|
from metrik.conf import get_config
|
||||||
|
|
||||||
|
|
||||||
class MongoCreateTask(Task):
|
class MongoCreateTask(Task):
|
||||||
@ -70,52 +70,50 @@ class MongoNoBackCreateTask(MongoCreateTask):
|
|||||||
class MongoRateLimit(object):
|
class MongoRateLimit(object):
|
||||||
rate_limit_collection = 'rate_limit'
|
rate_limit_collection = 'rate_limit'
|
||||||
|
|
||||||
def __init__(self, service, limit, interval, max_tries=5, backoff=.5):
|
def __init__(self):
|
||||||
"""
|
config = get_config()
|
||||||
|
self.db = MongoClient(
|
||||||
:param present:
|
host=config.get('metrik', 'mongo_host'),
|
||||||
:type present: datetime.datetime
|
port=config.getint('metrik', 'mongo_port'))[
|
||||||
:param service:
|
config.get('metrik', 'mongo_database')
|
||||||
:param limit:
|
]
|
||||||
:param interval:
|
|
||||||
:type interval: datetime.timedelta
|
|
||||||
:param max_tries:
|
|
||||||
:param backoff:
|
|
||||||
"""
|
|
||||||
self.service = service
|
|
||||||
self.limit = limit
|
|
||||||
self.interval = interval
|
|
||||||
self.max_tries = max_tries
|
|
||||||
self.backoff = backoff
|
|
||||||
self.db = MongoClient(host=MONGO_HOST, port=MONGO_PORT)[MONGO_DATABASE]
|
|
||||||
|
|
||||||
def get_present(self):
|
def get_present(self):
|
||||||
return datetime.datetime.now()
|
return datetime.datetime.now()
|
||||||
|
|
||||||
def query_locks(self, present):
|
def query_locks(self, present, interval, service):
|
||||||
return self.db[self.rate_limit_collection].find(
|
return self.db[self.rate_limit_collection].find(
|
||||||
{'_created_at': {'$gt': present - self.interval},
|
{'_created_at': {'$gt': present - interval},
|
||||||
'service': self.service}).count()
|
'service': service}).count()
|
||||||
|
|
||||||
def save_lock(self, present):
|
def save_lock(self, present, service):
|
||||||
self.db[self.rate_limit_collection].save({
|
self.db[self.rate_limit_collection].save({
|
||||||
'_created_at': present, 'service': self.service
|
'_created_at': present, 'service': service
|
||||||
})
|
})
|
||||||
|
|
||||||
def sleep_until(self, present):
|
def sleep_until(self, present, interval, backoff):
|
||||||
future_time = present + self.interval * self.backoff
|
future_time = present + interval * backoff
|
||||||
return (future_time - present).total_seconds()
|
return (future_time - present).total_seconds()
|
||||||
|
|
||||||
def acquire_lock(self):
|
def acquire_lock(self, service, limit, interval, max_tries=5, backoff=.5):
|
||||||
num_tries = 0
|
num_tries = 0
|
||||||
while num_tries < self.max_tries:
|
while num_tries < max_tries:
|
||||||
num_tries += 1
|
num_tries += 1
|
||||||
num_locks = self.query_locks(self.get_present())
|
num_locks = self.query_locks(
|
||||||
if num_locks < self.limit:
|
self.get_present(),
|
||||||
self.save_lock(self.get_present())
|
interval,
|
||||||
|
service
|
||||||
|
)
|
||||||
|
|
||||||
|
if num_locks < limit:
|
||||||
|
self.save_lock(self.get_present(), service)
|
||||||
return True
|
return True
|
||||||
elif num_tries < self.max_tries:
|
elif num_tries < max_tries:
|
||||||
sleep_amount = self.sleep_until(self.get_present())
|
sleep_amount = self.sleep_until(
|
||||||
|
self.get_present(),
|
||||||
|
interval,
|
||||||
|
backoff
|
||||||
|
)
|
||||||
sleep(sleep_amount)
|
sleep(sleep_amount)
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
@ -6,3 +6,4 @@ pymongo>=3.2
|
|||||||
python-dateutil>=2.4.2
|
python-dateutil>=2.4.2
|
||||||
pandas>=0.17.1
|
pandas>=0.17.1
|
||||||
argparse>=1.1.0
|
argparse>=1.1.0
|
||||||
|
requests-oauthlib>=0.4.0
|
3
setup.py
3
setup.py
@ -18,7 +18,8 @@ setup(
|
|||||||
'pytz >= 2016.6.1',
|
'pytz >= 2016.6.1',
|
||||||
'python-dateutil >= 2.4.2',
|
'python-dateutil >= 2.4.2',
|
||||||
'pandas >= 0.17.1',
|
'pandas >= 0.17.1',
|
||||||
'argparse >= 1.1.0'
|
'argparse >= 1.1.0',
|
||||||
|
'requests-oauthlib >= 0.4.0'
|
||||||
],
|
],
|
||||||
setup_requires=[
|
setup_requires=[
|
||||||
'pytest_runner'
|
'pytest_runner'
|
||||||
|
@ -1,18 +1,22 @@
|
|||||||
from unittest import TestCase
|
from unittest import TestCase
|
||||||
from pymongo import MongoClient
|
from pymongo import MongoClient
|
||||||
|
|
||||||
from metrik.conf import MONGO_DATABASE, MONGO_PORT, MONGO_HOST
|
from metrik.conf import get_config
|
||||||
from metrik.targets.mongo import MongoTarget
|
from metrik.targets.mongo import MongoTarget
|
||||||
|
|
||||||
|
|
||||||
class MongoTest(TestCase):
|
class MongoTest(TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.client = MongoClient(MONGO_HOST, MONGO_PORT)
|
config = get_config()
|
||||||
self.db = self.client[MONGO_DATABASE]
|
self.client = MongoClient(
|
||||||
|
host=config.get('metrik', 'mongo_host'),
|
||||||
|
port=config.getint('metrik', 'mongo_port'))
|
||||||
|
self.db = self.client[config.get('metrik', 'mongo_database')]
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
super(MongoTest, self).tearDown()
|
super(MongoTest, self).tearDown()
|
||||||
self.client.drop_database(MONGO_DATABASE)
|
config = get_config()
|
||||||
|
self.client.drop_database(config.get('metrik', 'mongo_database'))
|
||||||
|
|
||||||
|
|
||||||
class MongoTestTest(MongoTest):
|
class MongoTestTest(MongoTest):
|
||||||
|
@ -15,42 +15,41 @@ class BaseTaskTest(TestCase):
|
|||||||
class RateLimitTest(MongoTest):
|
class RateLimitTest(MongoTest):
|
||||||
def test_save_creates_record(self):
|
def test_save_creates_record(self):
|
||||||
service = 'testing_ratelimit'
|
service = 'testing_ratelimit'
|
||||||
|
interval = timedelta(seconds=1)
|
||||||
assert self.db[MongoRateLimit.rate_limit_collection].count() == 0
|
assert self.db[MongoRateLimit.rate_limit_collection].count() == 0
|
||||||
|
|
||||||
present = datetime.now()
|
present = datetime.now()
|
||||||
onesec_back = present - timedelta(seconds=1)
|
onesec_back = present - timedelta(seconds=1)
|
||||||
ratelimit = MongoRateLimit(
|
ratelimit = MongoRateLimit()
|
||||||
service, 1, timedelta(seconds=1)
|
assert ratelimit.query_locks(onesec_back, interval, service) == 0
|
||||||
)
|
|
||||||
assert ratelimit.query_locks(onesec_back) == 0
|
|
||||||
|
|
||||||
ratelimit.save_lock(present)
|
ratelimit.save_lock(present, service)
|
||||||
assert self.db[MongoRateLimit.rate_limit_collection].count() == 1
|
assert self.db[MongoRateLimit.rate_limit_collection].count() == 1
|
||||||
assert ratelimit.query_locks(onesec_back) == 1
|
assert ratelimit.query_locks(onesec_back, interval, service) == 1
|
||||||
|
|
||||||
def test_save_creates_correct_service(self):
|
def test_save_creates_correct_service(self):
|
||||||
service_1 = 'testing_ratelimit_1'
|
service_1 = 'testing_ratelimit_1'
|
||||||
service_2 = 'testing_ratelimit_2'
|
service_2 = 'testing_ratelimit_2'
|
||||||
|
interval = timedelta(seconds=1)
|
||||||
|
|
||||||
ratelimit1 = MongoRateLimit(
|
ratelimit = MongoRateLimit()
|
||||||
service_1, 1, timedelta(seconds=1)
|
|
||||||
)
|
|
||||||
ratelimit2 = MongoRateLimit(
|
|
||||||
service_2, 1, timedelta(seconds=1)
|
|
||||||
)
|
|
||||||
|
|
||||||
present = datetime.now()
|
present = datetime.now()
|
||||||
assert self.db[MongoRateLimit.rate_limit_collection].count() == 0
|
assert self.db[MongoRateLimit.rate_limit_collection].count() == 0
|
||||||
assert ratelimit1.query_locks(present) == 0
|
assert ratelimit.query_locks(present, interval, service_1) == 0
|
||||||
assert ratelimit2.query_locks(present) == 0
|
assert ratelimit.query_locks(present, interval, service_2) == 0
|
||||||
|
|
||||||
ratelimit1.save_lock(present)
|
ratelimit.save_lock(present, service_1)
|
||||||
assert self.db[MongoRateLimit.rate_limit_collection].count() == 1
|
assert self.db[MongoRateLimit.rate_limit_collection].count() == 1
|
||||||
assert ratelimit1.query_locks(present) == 1
|
assert ratelimit.query_locks(present, interval, service_1) == 1
|
||||||
assert ratelimit2.query_locks(present) == 0
|
assert ratelimit.query_locks(present, interval, service_2) == 0
|
||||||
|
|
||||||
def test_acquire_lock_fails(self):
|
def test_acquire_lock_fails(self):
|
||||||
service = 'testing_ratelimit'
|
service = 'testing_ratelimit'
|
||||||
|
limit = 1
|
||||||
|
interval = timedelta(seconds=1)
|
||||||
|
max_tries = 1
|
||||||
|
backoff = 10
|
||||||
|
|
||||||
# The first scenario is as follows:
|
# The first scenario is as follows:
|
||||||
# We try to acquire a lock with 1 try, backoff is 10.
|
# We try to acquire a lock with 1 try, backoff is 10.
|
||||||
@ -60,19 +59,22 @@ class RateLimitTest(MongoTest):
|
|||||||
# lock and are only allowed one try.
|
# lock and are only allowed one try.
|
||||||
# Ultimately, we are testing that the 'fail immediately'
|
# Ultimately, we are testing that the 'fail immediately'
|
||||||
# switch gets triggered correctly
|
# switch gets triggered correctly
|
||||||
ratelimit = MongoRateLimit(
|
ratelimit = MongoRateLimit()
|
||||||
service, 1, timedelta(seconds=1), max_tries=1, backoff=10
|
|
||||||
)
|
|
||||||
|
|
||||||
start = datetime.now()
|
start = datetime.now()
|
||||||
ratelimit.save_lock(start)
|
ratelimit.save_lock(start, service)
|
||||||
did_acquire = ratelimit.acquire_lock()
|
did_acquire = ratelimit.acquire_lock(service, limit, interval,
|
||||||
|
max_tries, backoff)
|
||||||
end = datetime.now()
|
end = datetime.now()
|
||||||
assert not did_acquire
|
assert not did_acquire
|
||||||
assert (end - start).total_seconds() < 1
|
assert (end - start).total_seconds() < 1
|
||||||
|
|
||||||
def test_acquire_lock_succeeds(self):
|
def test_acquire_lock_succeeds(self):
|
||||||
service = 'testing_ratelimit'
|
service = 'testing_ratelimit'
|
||||||
|
limit = 1
|
||||||
|
interval = timedelta(seconds=1)
|
||||||
|
max_tries = 2
|
||||||
|
backoff = 1
|
||||||
|
|
||||||
# The first scenario is as follows:
|
# The first scenario is as follows:
|
||||||
# We try to acquire a lock with two tries, backoff is 1.
|
# We try to acquire a lock with two tries, backoff is 1.
|
||||||
@ -80,13 +82,12 @@ class RateLimitTest(MongoTest):
|
|||||||
# thus when we try to acquire on the first try, we should fail.
|
# thus when we try to acquire on the first try, we should fail.
|
||||||
# However, the backoff should kick in, and we acquire successfully
|
# However, the backoff should kick in, and we acquire successfully
|
||||||
# on the second try
|
# on the second try
|
||||||
ratelimit = MongoRateLimit(
|
ratelimit = MongoRateLimit()
|
||||||
service, 1, timedelta(seconds=1), max_tries=2, backoff=1
|
|
||||||
)
|
|
||||||
|
|
||||||
start = datetime.now()
|
start = datetime.now()
|
||||||
ratelimit.save_lock(start - timedelta(seconds=.5))
|
ratelimit.save_lock(start - timedelta(seconds=.5), service)
|
||||||
did_acquire = ratelimit.acquire_lock()
|
did_acquire = ratelimit.acquire_lock(service, limit, interval,
|
||||||
|
max_tries, backoff)
|
||||||
end = datetime.now()
|
end = datetime.now()
|
||||||
# Check that we acquired the lock
|
# Check that we acquired the lock
|
||||||
assert did_acquire
|
assert did_acquire
|
||||||
|
10
test/test_conf.py
Normal file
10
test/test_conf.py
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
from unittest import TestCase
|
||||||
|
|
||||||
|
from metrik.conf import get_config
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigurationTest(TestCase):
|
||||||
|
|
||||||
|
def test_config_returns_values(self):
|
||||||
|
config = get_config([])
|
||||||
|
assert config is not None
|
Loading…
Reference in New Issue
Block a user