Power session - tests / mocking / overwriting
Added default to the package init. Redid testing suite to use unittest. Allow headers to be overwritten.
This commit is contained in:
parent
0e2aee73cf
commit
5ee6c1579f
@ -1 +1 @@
|
||||
from core import BackupCeligo
|
||||
from core import BackupCeligo, DEFAULT_BASE_URL
|
||||
|
@ -10,30 +10,63 @@ from slugify import slugify
|
||||
import prompt
|
||||
|
||||
L = logging.getLogger(__name__)
|
||||
DEFAULT_BASE_URL = "https://api.integrator.io/v1/"
|
||||
|
||||
|
||||
class BackupCeligo(object):
|
||||
"""
|
||||
This module interacts with the Celigo API.
|
||||
"""
|
||||
|
||||
def __init__(self, data_dir, api_key, base_url=None):
|
||||
def __init__(self,
|
||||
data_dir,
|
||||
api_key,
|
||||
headers=None,
|
||||
base_url=None,):
|
||||
"""
|
||||
:param data_dir: The directory to read and store JSON files
|
||||
for imports and configurations.
|
||||
:param api_key: String of api key to add to the header for
|
||||
the api calls.
|
||||
:param headers: Single level dict of headers to add the the
|
||||
api call session.
|
||||
:param base_url: Url for the base of the api to call to.
|
||||
Default is None, which will just fill in celigo.
|
||||
"""
|
||||
if not base_url:
|
||||
base_url = "https://api.integrator.io/v1/"
|
||||
base_url = DEFAULT_BASE_URL
|
||||
|
||||
self.api_key = api_key
|
||||
self.data_dir = data_dir
|
||||
self.base_url = base_url
|
||||
self.imports_cache = {}
|
||||
self.setup_headers(headers)
|
||||
self.setup_requests_session()
|
||||
|
||||
def setup_headers(self, headers=None):
|
||||
"""
|
||||
Setup self.headers so that it has, at a minimum
|
||||
Authorization, and Content-Type
|
||||
Allow any others to be updated.
|
||||
:param headers: Single Level dict of headers
|
||||
"""
|
||||
_headers = {}
|
||||
_headers['Authorization'] = "Bearer {API_KEY}".format(
|
||||
API_KEY=self.api_key)
|
||||
_headers['Content-Type'] = "application/json"
|
||||
# Set any additional headers
|
||||
if headers:
|
||||
_headers.update(**headers)
|
||||
self.headers = _headers
|
||||
|
||||
def setup_requests_session(self):
|
||||
"""
|
||||
Setup a session using requests.
|
||||
This has the benefit of always using the Authorization header,
|
||||
and the Content-Type header, rather than configuring it on every
|
||||
request.
|
||||
"""
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update({
|
||||
"Authorization": "Bearer {API_KEY}".format(
|
||||
API_KEY=self.api_key),
|
||||
"Content-Type": "application/json"
|
||||
})
|
||||
self.session.headers.update(self.headers)
|
||||
|
||||
def ensure_directories_exist(self):
|
||||
""" Make the directory if it doesn't exist """
|
||||
@ -44,8 +77,6 @@ class BackupCeligo(object):
|
||||
if not os.path.exists(_dir):
|
||||
os.makedirs(_dir)
|
||||
|
||||
|
||||
|
||||
def _celigo_api_get(self, path):
|
||||
"""
|
||||
Make a GET request to the celigo API
|
||||
@ -201,11 +232,12 @@ class BackupCeligo(object):
|
||||
"""
|
||||
raise NotImplemented("NOT IMPLEMENTED YET")
|
||||
# yes this works but I don't want to allow this in testing.
|
||||
import_id, import_conf = self.imports_cache[key]
|
||||
response = self._celigo_api_put(
|
||||
"imports/{id}/distributed".format(
|
||||
id=import_id),
|
||||
import_conf)
|
||||
L.info("Restored: ")
|
||||
L.info(response.json()['_id'])
|
||||
L.info("")
|
||||
# TODO: Uncomment.
|
||||
# import_id, import_conf = self.imports_cache[key]
|
||||
# response = self._celigo_api_put(
|
||||
# "imports/{id}/distributed".format(
|
||||
# id=import_id),
|
||||
# import_conf)
|
||||
# L.info("Restored: ")
|
||||
# L.info(response.json()['_id'])
|
||||
# L.info("")
|
||||
|
@ -7,6 +7,7 @@ pyflakes==1.0.0
|
||||
pytest==2.9.1
|
||||
python-slugify==1.2.0
|
||||
requests==2.10.0
|
||||
requests-mock==1.0.0
|
||||
six==1.10.0
|
||||
stevedore==1.12.0
|
||||
Unidecode==0.4.19
|
||||
|
61
tests.py
61
tests.py
@ -1,16 +1,59 @@
|
||||
# testing imports
|
||||
import pytest
|
||||
import unittest
|
||||
import tempfile
|
||||
import os
|
||||
import requests_mock
|
||||
|
||||
# package imports
|
||||
import celigo
|
||||
|
||||
def test_needs_all_params():
|
||||
"""
|
||||
Test that we need to pass in both an api key and a data directory.
|
||||
"""
|
||||
with pytest.raises(TypeError):
|
||||
backup = celigo.BackupCeligo()
|
||||
api_key = "fake"
|
||||
data_dir = "fakedir"
|
||||
backup = celigo.BackupCeligo(data_dir, api_key)
|
||||
|
||||
class CeligoTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.FAKE_API_KEY = 'fake'
|
||||
|
||||
def test_needs_all_params(self):
|
||||
"""
|
||||
Test that we need to pass in both an api key and a data directory.
|
||||
"""
|
||||
with pytest.raises(TypeError):
|
||||
celigo.BackupCeligo()
|
||||
data_dir = "fakedir"
|
||||
celigo.BackupCeligo(data_dir, self.FAKE_API_KEY)
|
||||
|
||||
def test_ensure_directories(self):
|
||||
"""
|
||||
Test that the ensure_directories_exist works properly.
|
||||
"""
|
||||
tempdir = tempfile.mkdtemp('celigo_testing')
|
||||
bc = celigo.BackupCeligo(tempdir, self.FAKE_API_KEY)
|
||||
imports_dir = os.path.join(tempdir, "imports")
|
||||
connections_dir = os.path.join(tempdir, "connections")
|
||||
# Check that the directories don't exist already.
|
||||
self.assertFalse(
|
||||
os.path.exists(imports_dir),
|
||||
"imports dir exists")
|
||||
self.assertFalse(
|
||||
os.path.exists(connections_dir),
|
||||
"connections dir exists")
|
||||
# Make the directories.
|
||||
bc.ensure_directories_exist()
|
||||
self.assertTrue(
|
||||
os.path.exists(imports_dir),
|
||||
"Did not create proper directory"
|
||||
)
|
||||
self.assertTrue(
|
||||
os.path.exists(connections_dir),
|
||||
"Did not create proper directory"
|
||||
)
|
||||
|
||||
# Make sure nothing errors if the directories exist already.
|
||||
bc.ensure_directories_exist()
|
||||
|
||||
@requests_mock.Mocker()
|
||||
def test_fake_requests(self, rqm):
|
||||
rqm.get(celigo.DEFAULT_BASE_URL, text='resp')
|
||||
tempdir = tempfile.mkdtemp('celigo_testing')
|
||||
bc = celigo.BackupCeligo(tempdir, self.FAKE_API_KEY)
|
||||
|
Loading…
Reference in New Issue
Block a user