Merge pull request #340 from runseb/flake8

Add flake8 test and fix pep8 style in expansion scripts
pull/376/head
Matt Butcher 9 years ago
commit b2f1e8b938

@ -41,7 +41,7 @@ clean:
rm -rf bin
.PHONY: test
test: build test-style test-unit
test: build test-style test-unit test-flake8
ROOTFS := rootfs
@ -64,6 +64,12 @@ test-style: lint vet
echo "gofmt check failed:"; gofmt -e -d -s $(GO_DIRS); exit 1; \
fi
.PHONY: test-flake8
test-flake8:
@echo Running flake8...
flake8 expansion
@echo ----------------
.PHONY: lint
lint:
@echo Running golint...

@ -19,6 +19,7 @@ dependencies:
- export PATH="$HOME/bin:$PATH" GLIDE_HOME="$HOME/.glide"
- cd $GOPATH/src/$IMPORT_PATH
- sudo pip install -r expansion/requirements.txt
- sudo pip install flake8
test:
override:

@ -6,7 +6,7 @@
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
@ -29,354 +29,364 @@ import schema_validation
def Expand(config, imports=None, env=None, validate_schema=False):
"""Expand the configuration with imports.
Args:
config: string, the raw config to be expanded.
imports: map from import file name, e.g. "helpers/constants.py" to
its contents.
env: map from string to string, the map of environment variable names
to their values
validate_schema: True to run schema validation; False otherwise
Returns:
YAML containing the expanded configuration and its layout, in the following
format:
config:
...
layout:
...
Raises:
ExpansionError: if there is any error occurred during expansion
"""
try:
return _Expand(config, imports=imports, env=env,
validate_schema=validate_schema)
except Exception as e:
# print traceback.format_exc()
raise ExpansionError('config', str(e))
"""Expand the configuration with imports.
Args:
config: string, the raw config to be expanded.
imports: map from import file name, e.g. "helpers/constants.py" to
its contents.
env: map from string to string, the map of environment variable names
to their values
validate_schema: True to run schema validation; False otherwise
Returns:
YAML containing the expanded configuration and its layout,
in the following format:
config:
...
layout:
...
Raises:
ExpansionError: if there is any error occurred during expansion
"""
try:
return _Expand(config, imports=imports, env=env,
validate_schema=validate_schema)
except Exception as e:
# print traceback.format_exc()
raise ExpansionError('config', str(e))
def _Expand(config, imports=None, env=None, validate_schema=False):
"""Expand the configuration with imports."""
FileAccessRedirector.redirect(imports)
yaml_config = None
try:
yaml_config = yaml.safe_load(config)
except yaml.scanner.ScannerError as e:
# Here we know that YAML parser could not parse the template we've given it.
# YAML raises a ScannerError that specifies which file had the problem, as
# well as line and column, but since we're giving it the template from
# string, error message contains <string>, which is not very helpful on the
# user end, so replace it with word "template" and make it obvious that YAML
# contains a syntactic error.
msg = str(e).replace('"<string>"', 'template')
raise Exception('Error parsing YAML: %s' % msg)
# Handle empty file case
if not yaml_config:
return ''
# If the configuration does not have ':' in it, the yaml_config will be a
# string. If this is the case just return the str. The code below it assumes
# yaml_config is a map for common cases.
if type(yaml_config) is str:
return yaml_config
"""Expand the configuration with imports."""
if not yaml_config.has_key('resources') or yaml_config['resources'] is None:
yaml_config['resources'] = []
FileAccessRedirector.redirect(imports)
config = {'resources': []}
layout = {'resources': []}
yaml_config = None
try:
yaml_config = yaml.safe_load(config)
except yaml.scanner.ScannerError as e:
# Here we know that YAML parser could not parse the template
# we've given it. YAML raises a ScannerError that specifies which file
# had the problem, as well as line and column, but since we're giving
# it the template from string, error message contains <string>, which
# is not very helpful on the user end, so replace it with word
# "template" and make it obvious that YAML contains a syntactic error.
msg = str(e).replace('"<string>"', 'template')
raise Exception('Error parsing YAML: %s' % msg)
# Handle empty file case
if not yaml_config:
return ''
# If the configuration does not have ':' in it, the yaml_config will be a
# string. If this is the case just return the str. The code below it
# assumes yaml_config is a map for common cases.
if type(yaml_config) is str:
return yaml_config
if 'resources' not in yaml_config or yaml_config['resources'] is None:
yaml_config['resources'] = []
config = {'resources': []}
layout = {'resources': []}
_ValidateUniqueNames(yaml_config['resources'])
# Iterate over all the resources to process.
for resource in yaml_config['resources']:
processed_resource = _ProcessResource(resource, imports, env,
validate_schema)
_ValidateUniqueNames(yaml_config['resources'])
config['resources'].extend(processed_resource['config']['resources'])
layout['resources'].append(processed_resource['layout'])
# Iterate over all the resources to process.
for resource in yaml_config['resources']:
processed_resource = _ProcessResource(resource, imports, env,
validate_schema)
result = {'config': config, 'layout': layout}
return yaml.safe_dump(result, default_flow_style=False)
config['resources'].extend(processed_resource['config']['resources'])
layout['resources'].append(processed_resource['layout'])
result = {'config': config, 'layout': layout}
return yaml.safe_dump(result, default_flow_style=False)
def _ProcessResource(resource, imports, env, validate_schema=False):
"""Processes a resource and expands if template.
Args:
resource: the resource to be processed, as a map.
imports: map from string to string, the map of imported files names
and contents
env: map from string to string, the map of environment variable names
to their values
validate_schema: True to run schema validation; False otherwise
Returns:
A map containing the layout and configuration of the expanded
resource and any sub-resources, in the format:
{'config': ..., 'layout': ...}
Raises:
ExpansionError: if there is any error occurred during expansion
"""
# A resource has to have to a name.
if 'name' not in resource:
raise ExpansionError(resource, 'Resource does not have a name.')
# A resource has to have a type.
if 'type' not in resource:
raise ExpansionError(resource, 'Resource does not have type defined.')
config = {'resources': []}
# Initialize layout with basic resource information.
layout = {'name': resource['name'],
'type': resource['type']}
if resource['type'] in imports:
# A template resource, which contains sub-resources.
expanded_template = ExpandTemplate(resource, imports,
env, validate_schema)
if expanded_template['resources']:
_ValidateUniqueNames(expanded_template['resources'],
resource['type'])
# Process all sub-resources of this template.
for resource_to_process in expanded_template['resources']:
processed_resource = _ProcessResource(resource_to_process,
imports, env,
validate_schema)
# Append all sub-resources to the config resources,
# and the resulting layout of sub-resources.
config['resources'].extend(processed_resource['config']
['resources'])
# Lazy-initialize resources key here because it is not set for
# non-template layouts.
if 'resources' not in layout:
layout['resources'] = []
layout['resources'].append(processed_resource['layout'])
if 'properties' in resource:
layout['properties'] = resource['properties']
else:
# A normal resource has only itself for config.
config['resources'] = [resource]
return {'config': config,
'layout': layout}
def _ProcessResource(resource, imports, env, validate_schema=False):
"""Processes a resource and expands if template.
Args:
resource: the resource to be processed, as a map.
imports: map from string to string, the map of imported files names
and contents
env: map from string to string, the map of environment variable names
to their values
validate_schema: True to run schema validation; False otherwise
Returns:
A map containing the layout and configuration of the expanded
resource and any sub-resources, in the format:
{'config': ..., 'layout': ...}
Raises:
ExpansionError: if there is any error occurred during expansion
"""
# A resource has to have to a name.
if not resource.has_key('name'):
raise ExpansionError(resource, 'Resource does not have a name.')
# A resource has to have a type.
if not resource.has_key('type'):
raise ExpansionError(resource, 'Resource does not have type defined.')
config = {'resources': []}
# Initialize layout with basic resource information.
layout = {'name': resource['name'],
'type': resource['type']}
if resource['type'] in imports:
# A template resource, which contains sub-resources.
expanded_template = ExpandTemplate(resource, imports, env, validate_schema)
if expanded_template['resources']:
_ValidateUniqueNames(expanded_template['resources'], resource['type'])
# Process all sub-resources of this template.
for resource_to_process in expanded_template['resources']:
processed_resource = _ProcessResource(resource_to_process, imports, env,
validate_schema)
# Append all sub-resources to the config resources, and the resulting
# layout of sub-resources.
config['resources'].extend(processed_resource['config']['resources'])
def _ValidateUniqueNames(template_resources, template_name='config'):
"""Make sure that every resource name in the given template is unique."""
names = set()
# Validate that every resource name is unique
for resource in template_resources:
if 'name' in resource:
if resource['name'] in names:
raise ExpansionError(
resource,
'Resource name \'%s\' is not unique in %s.'
% (resource['name'], template_name))
names.add(resource['name'])
# If this resource doesn't have a name, we will report that error later
# Lazy-initialize resources key here because it is not set for
# non-template layouts.
if 'resources' not in layout:
layout['resources'] = []
layout['resources'].append(processed_resource['layout'])
if 'properties' in resource:
layout['properties'] = resource['properties']
else:
# A normal resource has only itself for config.
config['resources'] = [resource]
def ExpandTemplate(resource, imports, env, validate_schema=False):
"""Expands a template, calling expansion mechanism based on type.
Args:
resource: resource object, the resource that contains parameters to the
jinja file
imports: map from string to string, the map of imported files names
and contents
env: map from string to string, the map of environment variable names
to their values
validate_schema: True to run schema validation; False otherwise
Returns:
The final expanded template
Raises:
ExpansionError: if there is any error occurred during expansion
"""
source_file = resource['type']
path = resource['type']
# Look for Template in imports.
if source_file not in imports:
raise ExpansionError(
source_file,
'Unable to find source file %s in imports.' % (source_file))
# source_file could be a short version of the template
# say github short name) so we need to potentially map this into
# the fully resolvable name.
if 'path' in imports[source_file] and imports[source_file]['path']:
path = imports[source_file]['path']
resource['imports'] = imports
# Populate the additional environment variables.
if env is None:
env = {}
env['name'] = resource['name']
env['type'] = resource['type']
resource['env'] = env
schema = source_file + '.schema'
if validate_schema and schema in imports:
properties = resource['properties'] if 'properties' in resource else {}
try:
resource['properties'] = schema_validation.Validate(
properties, schema, source_file, imports)
except schema_validation.ValidationErrors as e:
raise ExpansionError(resource['name'], e.message)
if path.endswith('jinja') or path.endswith('yaml'):
expanded_template = ExpandJinja(
source_file, imports[source_file]['content'], resource, imports)
elif path.endswith('py'):
# This is a Python template.
expanded_template = ExpandPython(
imports[source_file]['content'], source_file, resource)
else:
# The source file is not a jinja file or a python file.
# This in fact should never happen due to the IsTemplate check above.
raise ExpansionError(
resource['source'],
'Unsupported source file: %s.' % (source_file))
return {'config': config,
'layout': layout}
parsed_template = yaml.safe_load(expanded_template)
if parsed_template is None or 'resources' not in parsed_template:
raise ExpansionError(resource['type'],
'Template did not return a \'resources:\' field.')
def _ValidateUniqueNames(template_resources, template_name='config'):
"""Make sure that every resource name in the given template is unique."""
names = set()
# Validate that every resource name is unique
for resource in template_resources:
if 'name' in resource:
if resource['name'] in names:
raise ExpansionError(
resource,
'Resource name \'%s\' is not unique in %s.' % (resource['name'],
template_name))
names.add(resource['name'])
# If this resource doesn't have a name, we will report that error later
return parsed_template
def ExpandTemplate(resource, imports, env, validate_schema=False):
"""Expands a template, calling expansion mechanism based on type.
Args:
resource: resource object, the resource that contains parameters to the
jinja file
imports: map from string to string, the map of imported files names
and contents
env: map from string to string, the map of environment variable names
to their values
validate_schema: True to run schema validation; False otherwise
Returns:
The final expanded template
Raises:
ExpansionError: if there is any error occurred during expansion
"""
source_file = resource['type']
path = resource['type']
# Look for Template in imports.
if source_file not in imports:
raise ExpansionError(
source_file,
'Unable to find source file %s in imports.' % (source_file))
# source_file could be a short version of the template (say github short name)
# so we need to potentially map this into the fully resolvable name.
if 'path' in imports[source_file] and imports[source_file]['path']:
path = imports[source_file]['path']
resource['imports'] = imports
# Populate the additional environment variables.
if env is None:
env = {}
env['name'] = resource['name']
env['type'] = resource['type']
resource['env'] = env
def ExpandJinja(file_name, source_template, resource, imports):
"""Render the jinja template using jinja libraries.
Args:
file_name:
string, the file name.
source_template:
string, the content of jinja file to be render
resource:
resource object, the resource that contains parameters to the
jinja file
imports:
map from string to map {name, path}, the map of imported
files names fully resolved path and contents
Returns:
The final expanded template
Raises:
ExpansionError in case we fail to expand the Jinja2 template.
"""
schema = source_file + '.schema'
if validate_schema and schema in imports:
properties = resource['properties'] if 'properties' in resource else {}
try:
resource['properties'] = schema_validation.Validate(
properties, schema, source_file, imports)
except schema_validation.ValidationErrors as e:
raise ExpansionError(resource['name'], e.message)
if path.endswith('jinja') or path.endswith('yaml'):
expanded_template = ExpandJinja(
source_file, imports[source_file]['content'], resource, imports)
elif path.endswith('py'):
# This is a Python template.
expanded_template = ExpandPython(
imports[source_file]['content'], source_file, resource)
else:
# The source file is not a jinja file or a python file.
# This in fact should never happen due to the IsTemplate check above.
raise ExpansionError(
resource['source'],
'Unsupported source file: %s.' % (source_file))
parsed_template = yaml.safe_load(expanded_template)
if parsed_template is None or 'resources' not in parsed_template:
raise ExpansionError(resource['type'],
'Template did not return a \'resources:\' field.')
return parsed_template
env = jinja2.Environment(loader=jinja2.DictLoader(imports))
template = env.from_string(source_template)
def ExpandJinja(file_name, source_template, resource, imports):
"""Render the jinja template using jinja libraries.
Args:
file_name: string, the file name.
source_template: string, the content of jinja file to be render
resource: resource object, the resource that contains parameters to the
jinja file
imports: map from string to map {name, path}, the map of imported files names
fully resolved path and contents
Returns:
The final expanded template
Raises:
ExpansionError in case we fail to expand the Jinja2 template.
"""
try:
env = jinja2.Environment(loader=jinja2.DictLoader(imports))
template = env.from_string(source_template)
if (resource.has_key('properties') or resource.has_key('env') or
resource.has_key('imports')):
return template.render(resource)
else:
return template.render()
except Exception:
st = 'Exception in %s\n%s'%(file_name, traceback.format_exc())
raise ExpansionError(file_name, st)
if ('properties' in resource or 'env' in resource or
'imports' in resource):
return template.render(resource)
else:
return template.render()
except Exception:
st = 'Exception in %s\n%s' % (file_name, traceback.format_exc())
raise ExpansionError(file_name, st)
def ExpandPython(python_source, file_name, params):
"""Run python script to get the expanded template.
"""Run python script to get the expanded template.
Args:
python_source: string, the python source file to run
file_name: string, the name of the python source file
params: object that contains 'imports' and 'params', the parameters to
the python script
Returns:
The final expanded template.
"""
Args:
python_source: string, the python source file to run
file_name: string, the name of the python source file
params: object that contains 'imports' and 'params', the parameters to
the python script
Returns:
The final expanded template.
"""
try:
# Compile the python code to be run.
constructor = {}
compiled_code = compile(python_source, '<string>', 'exec')
exec compiled_code in constructor # pylint: disable=exec-used
try:
# Compile the python code to be run.
constructor = {}
compiled_code = compile(python_source, '<string>', 'exec')
exec compiled_code in constructor # pylint: disable=exec-used
# Construct the parameters to the python script.
evaluation_context = PythonEvaluationContext(params)
# Construct the parameters to the python script.
evaluation_context = PythonEvaluationContext(params)
return constructor['GenerateConfig'](evaluation_context)
except Exception:
st = 'Exception in %s\n%s' % (file_name, traceback.format_exc())
raise ExpansionError(file_name, st)
return constructor['GenerateConfig'](evaluation_context)
except Exception:
st = 'Exception in %s\n%s' % (file_name, traceback.format_exc())
raise ExpansionError(file_name, st)
class PythonEvaluationContext(object):
"""The python evaluation context.
"""The python evaluation context.
Attributes:
params -- the parameters to be used in the expansion
"""
Attributes:
params -- the parameters to be used in the expansion
"""
def __init__(self, params):
if params.has_key('properties'):
self.properties = params['properties']
else:
self.properties = None
def __init__(self, params):
if 'properties' in params:
self.properties = params['properties']
else:
self.properties = None
if params.has_key('imports'):
self.imports = params['imports']
else:
self.imports = None
if 'imports' in params:
self.imports = params['imports']
else:
self.imports = None
if params.has_key('env'):
self.env = params['env']
else:
self.env = None
if 'env' in params:
self.env = params['env']
else:
self.env = None
class ExpansionError(Exception):
"""Exception raised for errors during expansion process.
"""Exception raised for errors during expansion process.
Attributes:
resource: the resource processed that results in the error
message: the detailed message of the error
"""
Attributes:
resource: the resource processed that results in the error
message: the detailed message of the error
"""
def __init__(self, resource, message):
self.resource = resource
self.message = message + ' Resource: ' + str(resource)
super(ExpansionError, self).__init__(self.message)
def __init__(self, resource, message):
self.resource = resource
self.message = message + ' Resource: ' + str(resource)
super(ExpansionError, self).__init__(self.message)
def main():
if len(sys.argv) < 2:
print >> sys.stderr, 'No input specified.'
sys.exit(1)
template = sys.argv[1]
idx = 2
imports = {}
while idx < len(sys.argv):
if idx + 1 == len(sys.argv):
print >>sys.stderr, 'Invalid import definition at argv pos %d' % idx
sys.exit(1)
name = sys.argv[idx]
path = sys.argv[idx + 1]
value = sys.argv[idx + 2]
imports[name] = {'content': value, 'path': path}
idx += 3
env = {}
env['deployment'] = os.environ['DEPLOYMENT_NAME']
env['project'] = os.environ['PROJECT']
validate_schema = 'VALIDATE_SCHEMA' in os.environ
# Call the expansion logic to actually expand the template.
print Expand(template, imports, env=env, validate_schema=validate_schema)
if len(sys.argv) < 2:
print >> sys.stderr, 'No input specified.'
sys.exit(1)
template = sys.argv[1]
idx = 2
imports = {}
while idx < len(sys.argv):
if idx + 1 == len(sys.argv):
print >>sys.stderr, 'Invalid import definition at argv pos %d' \
% idx
sys.exit(1)
name = sys.argv[idx]
path = sys.argv[idx + 1]
value = sys.argv[idx + 2]
imports[name] = {'content': value, 'path': path}
idx += 3
env = {}
env['deployment'] = os.environ['DEPLOYMENT_NAME']
env['project'] = os.environ['PROJECT']
validate_schema = 'VALIDATE_SCHEMA' in os.environ
# Call the expansion logic to actually expand the template.
print Expand(template, imports, env=env, validate_schema=validate_schema)
if __name__ == '__main__':
main()
main()

@ -4,7 +4,7 @@
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
@ -21,486 +21,491 @@ import yaml
def GetFilePath():
"""Find our source and data files."""
return os.path.dirname(os.path.abspath(__file__))
"""Find our source and data files."""
return os.path.dirname(os.path.abspath(__file__))
def ReadTestFile(filename):
"""Returns contents of a file from the test/ directory."""
"""Returns contents of a file from the test/ directory."""
full_path = GetFilePath() + '/../test/templates/' + filename
test_file = open(full_path, 'r')
return test_file.read()
full_path = GetFilePath() + '/../test/templates/' + filename
test_file = open(full_path, 'r')
return test_file.read()
def GetTestBasePath(filename):
"""Returns the base path of a file from the testdata/ directory."""
"""Returns the base path of a file from the testdata/ directory."""
full_path = GetFilePath() + '/../test/templates/' + filename
return os.path.dirname(full_path)
full_path = GetFilePath() + '/../test/templates/' + filename
return os.path.dirname(full_path)
class ExpansionTest(unittest.TestCase):
"""Tests basic functionality of the template expansion library."""
"""Tests basic functionality of the template expansion library."""
EMPTY_RESPONSE = 'config:\n resources: []\nlayout:\n resources: []\n'
EMPTY_RESPONSE = 'config:\n resources: []\nlayout:\n resources: []\n'
def testEmptyExpansion(self):
template = ''
expanded_template = expansion.Expand(
template)
def testEmptyExpansion(self):
template = ''
expanded_template = expansion.Expand(
template)
self.assertEqual('', expanded_template)
self.assertEqual('', expanded_template)
def testNoResourcesList(self):
template = 'imports: [ test.import ]'
expanded_template = expansion.Expand(
template)
def testNoResourcesList(self):
template = 'imports: [ test.import ]'
expanded_template = expansion.Expand(
template)
self.assertEqual(self.EMPTY_RESPONSE, expanded_template)
self.assertEqual(self.EMPTY_RESPONSE, expanded_template)
def testResourcesListEmpty(self):
template = 'resources:'
expanded_template = expansion.Expand(
template)
def testResourcesListEmpty(self):
template = 'resources:'
expanded_template = expansion.Expand(
template)
self.assertEqual(self.EMPTY_RESPONSE, expanded_template)
self.assertEqual(self.EMPTY_RESPONSE, expanded_template)
def testSimpleNoExpansionTemplate(self):
template = ReadTestFile('simple.yaml')
def testSimpleNoExpansionTemplate(self):
template = ReadTestFile('simple.yaml')
expanded_template = expansion.Expand(
template)
expanded_template = expansion.Expand(
template)
result_file = ReadTestFile('simple_result.yaml')
self.assertEquals(result_file, expanded_template)
result_file = ReadTestFile('simple_result.yaml')
self.assertEquals(result_file, expanded_template)
def testJinjaExpansion(self):
template = ReadTestFile('jinja_template.yaml')
def testJinjaExpansion(self):
template = ReadTestFile('jinja_template.yaml')
imports = {}
imports['jinja_template.jinja'] = ReadTestFile('jinja_template.jinja')
imports = {}
imports['jinja_template.jinja'] = ReadTestFile('jinja_template.jinja')
expanded_template = expansion.Expand(
template, imports)
expanded_template = expansion.Expand(
template, imports)
result_file = ReadTestFile('jinja_template_result.yaml')
result_file = ReadTestFile('jinja_template_result.yaml')
self.assertEquals(result_file, expanded_template)
self.assertEquals(result_file, expanded_template)
def testJinjaWithNoParamsExpansion(self):
template = ReadTestFile('jinja_noparams.yaml')
def testJinjaWithNoParamsExpansion(self):
template = ReadTestFile('jinja_noparams.yaml')
imports = {}
imports['jinja_noparams.jinja'] = ReadTestFile('jinja_noparams.jinja')
imports = {}
imports['jinja_noparams.jinja'] = ReadTestFile('jinja_noparams.jinja')
expanded_template = expansion.Expand(
template, imports)
expanded_template = expansion.Expand(
template, imports)
result_file = ReadTestFile('jinja_noparams_result.yaml')
result_file = ReadTestFile('jinja_noparams_result.yaml')
self.assertEquals(result_file, expanded_template)
self.assertEquals(result_file, expanded_template)
def testPythonWithNoParamsExpansion(self):
template = ReadTestFile('python_noparams.yaml')
def testPythonWithNoParamsExpansion(self):
template = ReadTestFile('python_noparams.yaml')
imports = {}
imports['python_noparams.py'] = ReadTestFile('python_noparams.py')
imports = {}
imports['python_noparams.py'] = ReadTestFile('python_noparams.py')
expanded_template = expansion.Expand(
template, imports)
expanded_template = expansion.Expand(
template, imports)
result_file = ReadTestFile('python_noparams_result.yaml')
result_file = ReadTestFile('python_noparams_result.yaml')
self.assertEquals(result_file, expanded_template)
self.assertEquals(result_file, expanded_template)
def testPythonExpansion(self):
template = ReadTestFile('python_template.yaml')
def testPythonExpansion(self):
template = ReadTestFile('python_template.yaml')
imports = {}
imports['python_template.py'] = ReadTestFile('python_template.py')
imports = {}
imports['python_template.py'] = ReadTestFile('python_template.py')
expanded_template = expansion.Expand(
template, imports)
expanded_template = expansion.Expand(
template, imports)
result_file = ReadTestFile('python_template_result.yaml')
result_file = ReadTestFile('python_template_result.yaml')
self.assertEquals(result_file, expanded_template)
self.assertEquals(result_file, expanded_template)
def testPythonAndJinjaExpansion(self):
template = ReadTestFile('python_and_jinja_template.yaml')
def testPythonAndJinjaExpansion(self):
template = ReadTestFile('python_and_jinja_template.yaml')
imports = {}
imports['python_and_jinja_template.py'] = ReadTestFile(
'python_and_jinja_template.py')
imports = {}
imports['python_and_jinja_template.py'] = ReadTestFile(
'python_and_jinja_template.py')
imports['python_and_jinja_template.jinja'] = ReadTestFile(
'python_and_jinja_template.jinja')
imports['python_and_jinja_template.jinja'] = ReadTestFile(
'python_and_jinja_template.jinja')
expanded_template = expansion.Expand(
template, imports)
expanded_template = expansion.Expand(
template, imports)
result_file = ReadTestFile('python_and_jinja_template_result.yaml')
result_file = ReadTestFile('python_and_jinja_template_result.yaml')
self.assertEquals(result_file, expanded_template)
self.assertEquals(result_file, expanded_template)
def testNoImportErrors(self):
template = 'resources: \n- type: something.jinja\n name: something'
expansion.Expand(template, {})
def testNoImportErrors(self):
template = 'resources: \n- type: something.jinja\n name: something'
expansion.Expand(template, {})
def testInvalidConfig(self):
template = ReadTestFile('invalid_config.yaml')
def testInvalidConfig(self):
template = ReadTestFile('invalid_config.yaml')
try:
expansion.Expand(
template)
self.fail('Expansion should fail')
except expansion.ExpansionError as e:
self.assertNotIn(os.path.basename(expansion.__name__), e.message,
'Do not leak internals')
try:
expansion.Expand(
template)
self.fail('Expansion should fail')
except expansion.ExpansionError as e:
self.assertNotIn(os.path.basename(expansion.__name__), e.message,
'Do not leak internals')
def testJinjaWithImport(self):
template = ReadTestFile('jinja_template_with_import.yaml')
def testJinjaWithImport(self):
template = ReadTestFile('jinja_template_with_import.yaml')
imports = {}
imports['jinja_template_with_import.jinja'] = ReadTestFile(
'jinja_template_with_import.jinja')
imports['helpers/common.jinja'] = ReadTestFile(
'helpers/common.jinja')
imports = {}
imports['jinja_template_with_import.jinja'] = ReadTestFile(
'jinja_template_with_import.jinja')
imports['helpers/common.jinja'] = ReadTestFile(
'helpers/common.jinja')
yaml_template = yaml.safe_load(template)
yaml_template = yaml.safe_load(template)
expanded_template = expansion.Expand(
str(yaml_template), imports)
expanded_template = expansion.Expand(
str(yaml_template), imports)
result_file = ReadTestFile('jinja_template_with_import_result.yaml')
result_file = ReadTestFile('jinja_template_with_import_result.yaml')
self.assertEquals(result_file, expanded_template)
self.assertEquals(result_file, expanded_template)
def testJinjaWithInlinedFile(self):
template = ReadTestFile('jinja_template_with_inlinedfile.yaml')
def testJinjaWithInlinedFile(self):
template = ReadTestFile('jinja_template_with_inlinedfile.yaml')
imports = {}
imports['jinja_template_with_inlinedfile.jinja'] = ReadTestFile(
'jinja_template_with_inlinedfile.jinja')
imports['helpers/common.jinja'] = ReadTestFile(
'helpers/common.jinja')
imports = {}
imports['jinja_template_with_inlinedfile.jinja'] = ReadTestFile(
'jinja_template_with_inlinedfile.jinja')
imports['helpers/common.jinja'] = ReadTestFile(
'helpers/common.jinja')
imports['description_text.txt'] = ReadTestFile('description_text.txt')
imports['description_text.txt'] = ReadTestFile('description_text.txt')
yaml_template = yaml.safe_load(template)
yaml_template = yaml.safe_load(template)
expanded_template = expansion.Expand(
str(yaml_template), imports)
expanded_template = expansion.Expand(
str(yaml_template), imports)
result_file = ReadTestFile('jinja_template_with_inlinedfile_result.yaml')
result_file = \
ReadTestFile('jinja_template_with_inlinedfile_result.yaml')
self.assertEquals(result_file, expanded_template)
self.assertEquals(result_file, expanded_template)
def testPythonWithImport(self):
template = ReadTestFile('python_template_with_import.yaml')
def testPythonWithImport(self):
template = ReadTestFile('python_template_with_import.yaml')
imports = {}
imports['python_template_with_import.py'] = ReadTestFile(
'python_template_with_import.py')
imports = {}
imports['python_template_with_import.py'] = ReadTestFile(
'python_template_with_import.py')
imports['helpers/common.py'] = ReadTestFile('helpers/common.py')
imports['helpers/extra/common2.py'] = ReadTestFile(
'helpers/extra/common2.py')
imports['helpers/extra'] = ReadTestFile('helpers/extra/__init__.py')
imports['helpers/common.py'] = ReadTestFile('helpers/common.py')
imports['helpers/extra/common2.py'] = ReadTestFile(
'helpers/extra/common2.py')
imports['helpers/extra'] = ReadTestFile('helpers/extra/__init__.py')
yaml_template = yaml.safe_load(template)
yaml_template = yaml.safe_load(template)
expanded_template = expansion.Expand(
str(yaml_template), imports)
expanded_template = expansion.Expand(
str(yaml_template), imports)
result_file = ReadTestFile('python_template_with_import_result.yaml')
result_file = ReadTestFile('python_template_with_import_result.yaml')
self.assertEquals(result_file, expanded_template)
self.assertEquals(result_file, expanded_template)
def testPythonWithInlinedFile(self):
template = ReadTestFile('python_template_with_inlinedfile.yaml')
def testPythonWithInlinedFile(self):
template = ReadTestFile('python_template_with_inlinedfile.yaml')
imports = {}
imports['python_template_with_inlinedfile.py'] = ReadTestFile(
'python_template_with_inlinedfile.py')
imports = {}
imports['python_template_with_inlinedfile.py'] = ReadTestFile(
'python_template_with_inlinedfile.py')
imports['helpers/common.py'] = ReadTestFile('helpers/common.py')
imports['helpers/extra/common2.py'] = ReadTestFile(
'helpers/extra/common2.py')
imports['helpers/common.py'] = ReadTestFile('helpers/common.py')
imports['helpers/extra/common2.py'] = ReadTestFile(
'helpers/extra/common2.py')
imports['description_text.txt'] = ReadTestFile('description_text.txt')
imports['description_text.txt'] = ReadTestFile('description_text.txt')
yaml_template = yaml.safe_load(template)
yaml_template = yaml.safe_load(template)
expanded_template = expansion.Expand(
str(yaml_template), imports)
expanded_template = expansion.Expand(
str(yaml_template), imports)
result_file = ReadTestFile(
'python_template_with_inlinedfile_result.yaml')
result_file = ReadTestFile(
'python_template_with_inlinedfile_result.yaml')
self.assertEquals(result_file, expanded_template)
self.assertEquals(result_file, expanded_template)
def testPythonWithEnvironment(self):
template = ReadTestFile('python_template_with_env.yaml')
def testPythonWithEnvironment(self):
template = ReadTestFile('python_template_with_env.yaml')
imports = {}
imports['python_template_with_env.py'] = ReadTestFile(
'python_template_with_env.py')
imports = {}
imports['python_template_with_env.py'] = ReadTestFile(
'python_template_with_env.py')
env = {'project': 'my-project'}
env = {'project': 'my-project'}
expanded_template = expansion.Expand(
template, imports, env)
expanded_template = expansion.Expand(
template, imports, env)
result_file = ReadTestFile('python_template_with_env_result.yaml')
self.assertEquals(result_file, expanded_template)
result_file = ReadTestFile('python_template_with_env_result.yaml')
self.assertEquals(result_file, expanded_template)
def testJinjaWithEnvironment(self):
template = ReadTestFile('jinja_template_with_env.yaml')
def testJinjaWithEnvironment(self):
template = ReadTestFile('jinja_template_with_env.yaml')
imports = {}
imports['jinja_template_with_env.jinja'] = ReadTestFile(
'jinja_template_with_env.jinja')
imports = {}
imports['jinja_template_with_env.jinja'] = ReadTestFile(
'jinja_template_with_env.jinja')
env = {'project': 'test-project', 'deployment': 'test-deployment'}
env = {'project': 'test-project', 'deployment': 'test-deployment'}
expanded_template = expansion.Expand(
template, imports, env)
expanded_template = expansion.Expand(
template, imports, env)
result_file = ReadTestFile('jinja_template_with_env_result.yaml')
result_file = ReadTestFile('jinja_template_with_env_result.yaml')
self.assertEquals(result_file, expanded_template)
self.assertEquals(result_file, expanded_template)
def testMissingNameErrors(self):
template = 'resources: \n- type: something.jinja\n'
def testMissingNameErrors(self):
template = 'resources: \n- type: something.jinja\n'
try:
expansion.Expand(template, {})
self.fail('Expansion should fail')
except expansion.ExpansionError as e:
self.assertTrue('not have a name' in e.message)
try:
expansion.Expand(template, {})
self.fail('Expansion should fail')
except expansion.ExpansionError as e:
self.assertTrue('not have a name' in e.message)
def testDuplicateNamesErrors(self):
template = ReadTestFile('duplicate_names.yaml')
def testDuplicateNamesErrors(self):
template = ReadTestFile('duplicate_names.yaml')
try:
expansion.Expand(template, {})
self.fail('Expansion should fail')
except expansion.ExpansionError as e:
self.assertTrue(("Resource name 'my_instance' is not unique"
" in config.") in e.message)
try:
expansion.Expand(template, {})
self.fail('Expansion should fail')
except expansion.ExpansionError as e:
self.assertTrue(("Resource name 'my_instance' is not unique"
" in config.") in e.message)
def testDuplicateNamesInSubtemplates(self):
template = ReadTestFile('duplicate_names_in_subtemplates.yaml')
def testDuplicateNamesInSubtemplates(self):
template = ReadTestFile('duplicate_names_in_subtemplates.yaml')
imports = {}
imports['duplicate_names_in_subtemplates.jinja'] = ReadTestFile(
'duplicate_names_in_subtemplates.jinja')
imports = {}
imports['duplicate_names_in_subtemplates.jinja'] = ReadTestFile(
'duplicate_names_in_subtemplates.jinja')
try:
expansion.Expand(
template, imports)
self.fail('Expansion should fail')
except expansion.ExpansionError as e:
self.assertTrue('not unique in duplicate_names_in_subtemplates.jinja'
in e.message)
try:
expansion.Expand(
template, imports)
self.fail('Expansion should fail')
except expansion.ExpansionError as e:
self.assertTrue('not unique in \
duplicate_names_in_subtemplates.jinja'
in e.message)
def testDuplicateNamesMixedLevel(self):
template = ReadTestFile('duplicate_names_mixed_level.yaml')
def testDuplicateNamesMixedLevel(self):
template = ReadTestFile('duplicate_names_mixed_level.yaml')
imports = {}
imports['duplicate_names_B.jinja'] = ReadTestFile(
'duplicate_names_B.jinja')
imports['duplicate_names_C.jinja'] = ReadTestFile(
'duplicate_names_C.jinja')
imports = {}
imports['duplicate_names_B.jinja'] = ReadTestFile(
'duplicate_names_B.jinja')
imports['duplicate_names_C.jinja'] = ReadTestFile(
'duplicate_names_C.jinja')
expanded_template = expansion.Expand(
template, imports)
expanded_template = expansion.Expand(
template, imports)
result_file = ReadTestFile('duplicate_names_mixed_level_result.yaml')
result_file = ReadTestFile('duplicate_names_mixed_level_result.yaml')
self.assertEquals(result_file, expanded_template)
self.assertEquals(result_file, expanded_template)
def testDuplicateNamesParentChild(self):
template = ReadTestFile('duplicate_names_parent_child.yaml')
def testDuplicateNamesParentChild(self):
template = ReadTestFile('duplicate_names_parent_child.yaml')
imports = {}
imports['duplicate_names_B.jinja'] = ReadTestFile(
'duplicate_names_B.jinja')
imports = {}
imports['duplicate_names_B.jinja'] = ReadTestFile(
'duplicate_names_B.jinja')
expanded_template = expansion.Expand(
template, imports)
expanded_template = expansion.Expand(
template, imports)
result_file = ReadTestFile('duplicate_names_parent_child_result.yaml')
result_file = ReadTestFile('duplicate_names_parent_child_result.yaml')
self.assertEquals(result_file, expanded_template)
# Note, this template will fail in the frontend for duplicate resource names
self.assertEquals(result_file, expanded_template)
# Note, this template will fail in the frontend
# for duplicate resource names
def testTemplateReturnsEmpty(self):
template = ReadTestFile('no_resources.yaml')
def testTemplateReturnsEmpty(self):
template = ReadTestFile('no_resources.yaml')
imports = {}
imports['no_resources.py'] = ReadTestFile(
'no_resources.py')
try:
expansion.Expand(
template, imports)
self.fail('Expansion should fail')
except expansion.ExpansionError as e:
self.assertIn('Template did not return a \'resources:\' field.',
e.message)
self.assertIn('no_resources.py', e.message)
def testJinjaDefaultsSchema(self):
# Loop 1000 times to make sure we don't rely on dictionary ordering.
for unused_x in range(0, 1000):
template = ReadTestFile('jinja_defaults.yaml')
imports = {}
imports['jinja_defaults.jinja'] = ReadTestFile(
'jinja_defaults.jinja')
imports['jinja_defaults.jinja.schema'] = ReadTestFile(
'jinja_defaults.jinja.schema')
expanded_template = expansion.Expand(
template, imports,
validate_schema=True)
result_file = ReadTestFile('jinja_defaults_result.yaml')
self.assertEquals(result_file, expanded_template)
def testPythonDefaultsOverrideSchema(self):
template = ReadTestFile('python_schema.yaml')
imports = {}
imports['python_schema.py'] = ReadTestFile('python_schema.py')
imports['python_schema.py.schema'] = ReadTestFile('python_schema.py.schema')
env = {'project': 'my-project'}
expanded_template = expansion.Expand(
template, imports, env=env,
validate_schema=True)
result_file = ReadTestFile('python_schema_result.yaml')
self.assertEquals(result_file, expanded_template)
def testJinjaMissingRequiredPropertySchema(self):
template = ReadTestFile('jinja_missing_required.yaml')
imports = {}
imports['jinja_missing_required.jinja'] = ReadTestFile(
'jinja_missing_required.jinja')
imports['jinja_missing_required.jinja.schema'] = ReadTestFile(
'jinja_missing_required.jinja.schema')
try:
expansion.Expand(
template, imports,
validate_schema=True)
self.fail('Expansion error expected')
except expansion.ExpansionError as e:
self.assertIn('Invalid properties', e.message)
self.assertIn("'important' is a required property", e.message)
self.assertIn('jinja_missing_required_resource_name', e.message)
def testJinjaErrorFileMessage(self):
template = ReadTestFile('jinja_unresolved.yaml')
imports = {}
imports['jinja_unresolved.jinja'] = ReadTestFile('jinja_unresolved.jinja')
try:
expansion.Expand(
template, imports,
validate_schema=False)
self.fail('Expansion error expected')
except expansion.ExpansionError as e:
self.assertIn('jinja_unresolved.jinja', e.message)
def testJinjaMultipleErrorsSchema(self):
template = ReadTestFile('jinja_multiple_errors.yaml')
imports = {}
imports['jinja_multiple_errors.jinja'] = ReadTestFile(
'jinja_multiple_errors.jinja')
imports['jinja_multiple_errors.jinja.schema'] = ReadTestFile(
'jinja_multiple_errors.jinja.schema')
try:
expansion.Expand(
template, imports,
validate_schema=True)
self.fail('Expansion error expected')
except expansion.ExpansionError as e:
self.assertIn('Invalid properties', e.message)
self.assertIn("'a string' is not of type 'integer'", e.message)
self.assertIn("'d' is not one of ['a', 'b', 'c']", e.message)
self.assertIn("'longer than 10 chars' is too long", e.message)
self.assertIn("{'multipleOf': 2} is not allowed for 6", e.message)
def testPythonBadSchema(self):
template = ReadTestFile('python_bad_schema.yaml')
imports = {}
imports['python_bad_schema.py'] = ReadTestFile(
'python_bad_schema.py')
imports['python_bad_schema.py.schema'] = ReadTestFile(
'python_bad_schema.py.schema')
try:
expansion.Expand(
template, imports,
validate_schema=True)
self.fail('Expansion error expected')
except expansion.ExpansionError as e:
self.assertIn('Invalid schema', e.message)
self.assertIn("'int' is not valid under any of the given schemas",
e.message)
self.assertIn("'maximum' is a dependency of u'exclusiveMaximum'",
e.message)
self.assertIn("10 is not of type u'boolean'", e.message)
self.assertIn("'not a list' is not of type u'array'", e.message)
def testNoProperties(self):
template = ReadTestFile('no_properties.yaml')
imports = {}
imports['no_properties.py'] = ReadTestFile(
'no_properties.py')
expanded_template = expansion.Expand(
template, imports,
validate_schema=True)
result_file = ReadTestFile('no_properties_result.yaml')
self.assertEquals(result_file, expanded_template)
def testNestedTemplateSchema(self):
template = ReadTestFile('use_helper.yaml')
imports = {}
imports['use_helper.jinja'] = ReadTestFile(
'use_helper.jinja')
imports['use_helper.jinja.schema'] = ReadTestFile(
'use_helper.jinja.schema')
imports['helper.jinja'] = ReadTestFile(
'helper.jinja')
imports['helper.jinja.schema'] = ReadTestFile(
'helper.jinja.schema')
expanded_template = expansion.Expand(
template, imports,
validate_schema=True)
result_file = ReadTestFile('use_helper_result.yaml')
self.assertEquals(result_file, expanded_template)
imports = {}
imports['no_resources.py'] = ReadTestFile(
'no_resources.py')
try:
expansion.Expand(
template, imports)
self.fail('Expansion should fail')
except expansion.ExpansionError as e:
self.assertIn('Template did not return a \'resources:\' field.',
e.message)
self.assertIn('no_resources.py', e.message)
def testJinjaDefaultsSchema(self):
# Loop 1000 times to make sure we don't rely on dictionary ordering.
for unused_x in range(0, 1000):
template = ReadTestFile('jinja_defaults.yaml')
imports = {}
imports['jinja_defaults.jinja'] = ReadTestFile(
'jinja_defaults.jinja')
imports['jinja_defaults.jinja.schema'] = ReadTestFile(
'jinja_defaults.jinja.schema')
expanded_template = expansion.Expand(
template, imports,
validate_schema=True)
result_file = ReadTestFile('jinja_defaults_result.yaml')
self.assertEquals(result_file, expanded_template)
def testPythonDefaultsOverrideSchema(self):
template = ReadTestFile('python_schema.yaml')
imports = {}
imports['python_schema.py'] = ReadTestFile('python_schema.py')
imports['python_schema.py.schema'] = \
ReadTestFile('python_schema.py.schema')
env = {'project': 'my-project'}
expanded_template = expansion.Expand(
template, imports, env=env,
validate_schema=True)
result_file = ReadTestFile('python_schema_result.yaml')
self.assertEquals(result_file, expanded_template)
def testJinjaMissingRequiredPropertySchema(self):
template = ReadTestFile('jinja_missing_required.yaml')
imports = {}
imports['jinja_missing_required.jinja'] = ReadTestFile(
'jinja_missing_required.jinja')
imports['jinja_missing_required.jinja.schema'] = ReadTestFile(
'jinja_missing_required.jinja.schema')
try:
expansion.Expand(
template, imports,
validate_schema=True)
self.fail('Expansion error expected')
except expansion.ExpansionError as e:
self.assertIn('Invalid properties', e.message)
self.assertIn("'important' is a required property", e.message)
self.assertIn('jinja_missing_required_resource_name', e.message)
def testJinjaErrorFileMessage(self):
template = ReadTestFile('jinja_unresolved.yaml')
imports = {}
imports['jinja_unresolved.jinja'] = \
ReadTestFile('jinja_unresolved.jinja')
try:
expansion.Expand(
template, imports,
validate_schema=False)
self.fail('Expansion error expected')
except expansion.ExpansionError as e:
self.assertIn('jinja_unresolved.jinja', e.message)
def testJinjaMultipleErrorsSchema(self):
template = ReadTestFile('jinja_multiple_errors.yaml')
imports = {}
imports['jinja_multiple_errors.jinja'] = ReadTestFile(
'jinja_multiple_errors.jinja')
imports['jinja_multiple_errors.jinja.schema'] = ReadTestFile(
'jinja_multiple_errors.jinja.schema')
try:
expansion.Expand(
template, imports,
validate_schema=True)
self.fail('Expansion error expected')
except expansion.ExpansionError as e:
self.assertIn('Invalid properties', e.message)
self.assertIn("'a string' is not of type 'integer'", e.message)
self.assertIn("'d' is not one of ['a', 'b', 'c']", e.message)
self.assertIn("'longer than 10 chars' is too long", e.message)
self.assertIn("{'multipleOf': 2} is not allowed for 6", e.message)
def testPythonBadSchema(self):
template = ReadTestFile('python_bad_schema.yaml')
imports = {}
imports['python_bad_schema.py'] = ReadTestFile(
'python_bad_schema.py')
imports['python_bad_schema.py.schema'] = ReadTestFile(
'python_bad_schema.py.schema')
try:
expansion.Expand(
template, imports,
validate_schema=True)
self.fail('Expansion error expected')
except expansion.ExpansionError as e:
self.assertIn('Invalid schema', e.message)
self.assertIn("'int' is not valid under any of the given schemas",
e.message)
self.assertIn("'maximum' is a dependency of u'exclusiveMaximum'",
e.message)
self.assertIn("10 is not of type u'boolean'", e.message)
self.assertIn("'not a list' is not of type u'array'", e.message)
def testNoProperties(self):
template = ReadTestFile('no_properties.yaml')
imports = {}
imports['no_properties.py'] = ReadTestFile(
'no_properties.py')
expanded_template = expansion.Expand(
template, imports,
validate_schema=True)
result_file = ReadTestFile('no_properties_result.yaml')
self.assertEquals(result_file, expanded_template)
def testNestedTemplateSchema(self):
template = ReadTestFile('use_helper.yaml')
imports = {}
imports['use_helper.jinja'] = ReadTestFile(
'use_helper.jinja')
imports['use_helper.jinja.schema'] = ReadTestFile(
'use_helper.jinja.schema')
imports['helper.jinja'] = ReadTestFile(
'helper.jinja')
imports['helper.jinja.schema'] = ReadTestFile(
'helper.jinja.schema')
expanded_template = expansion.Expand(
template, imports,
validate_schema=True)
result_file = ReadTestFile('use_helper_result.yaml')
self.assertEquals(result_file, expanded_template)
if __name__ == '__main__':
unittest.main()
unittest.main()

@ -4,7 +4,7 @@
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
@ -19,31 +19,31 @@ from expansion import Expand
def main():
if len(sys.argv) < 2:
print >>sys.stderr, 'No template specified.'
sys.exit(1)
template = ''
imports = {}
try:
with open(sys.argv[1]) as f:
template = f.read()
for imp in sys.argv[2:]:
import_contents = ''
with open(imp) as f:
import_contents = f.read()
import_name = os.path.basename(imp)
imports[import_name] = import_contents
except IOError as e:
print 'IOException: ', str(e)
sys.exit(1)
if len(sys.argv) < 2:
print >>sys.stderr, 'No template specified.'
sys.exit(1)
template = ''
imports = {}
try:
with open(sys.argv[1]) as f:
template = f.read()
for imp in sys.argv[2:]:
import_contents = ''
with open(imp) as f:
import_contents = f.read()
import_name = os.path.basename(imp)
imports[import_name] = import_contents
except IOError as e:
print 'IOException: ', str(e)
sys.exit(1)
env = {}
env['deployment'] = os.environ['DEPLOYMENT_NAME']
env['project'] = os.environ['PROJECT']
validate_schema = 'VALIDATE_SCHEMA' in os.environ
env = {}
env['deployment'] = os.environ['DEPLOYMENT_NAME']
env['project'] = os.environ['PROJECT']
validate_schema = 'VALIDATE_SCHEMA' in os.environ
print Expand(template, imports, env=env, validate_schema=validate_schema)
print Expand(template, imports, env=env, validate_schema=validate_schema)
if __name__ == '__main__':
main()
main()

@ -4,7 +4,7 @@
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
@ -22,135 +22,141 @@ import sys
class AllowedImportsLoader(object):
# Dictionary with modules loaded from user provided imports
user_modules = {}
# Dictionary with modules loaded from user provided imports
user_modules = {}
@staticmethod
def get_filename(name):
return '%s.py' % name.replace('.', '/')
@staticmethod
def get_filename(name):
return '%s.py' % name.replace('.', '/')
def load_module(self, name, etc=None): # pylint: disable=unused-argument
"""Implements loader.load_module() for loading user provided imports."""
def load_module(self, name, etc=None): # pylint: disable=unused-argument
"""Implements loader.load_module()
for loading user provided imports."""
if name in AllowedImportsLoader.user_modules:
return AllowedImportsLoader.user_modules[name]
if name in AllowedImportsLoader.user_modules:
return AllowedImportsLoader.user_modules[name]
module = imp.new_module(name)
module = imp.new_module(name)
try:
data = FileAccessRedirector.allowed_imports[self.get_filename(name)]
except Exception: # pylint: disable=broad-except
return None
try:
data = \
FileAccessRedirector.allowed_imports[self.get_filename(name)]
except Exception: # pylint: disable=broad-except
return None
# Run the module code.
exec data in module.__dict__ # pylint: disable=exec-used
# Run the module code.
exec data in module.__dict__ # pylint: disable=exec-used
AllowedImportsLoader.user_modules[name] = module
AllowedImportsLoader.user_modules[name] = module
# We need to register the module in module registry, since new_module
# doesn't do this, but we need it for hierarchical references.
sys.modules[name] = module
# We need to register the module in module registry, since new_module
# doesn't do this, but we need it for hierarchical references.
sys.modules[name] = module
# If this module has children load them recursively.
if name in FileAccessRedirector.parents:
for child in FileAccessRedirector.parents[name]:
full_name = name + '.' + child
self.load_module(full_name)
# If we have helpers/common.py package, then for it to be successfully
# resolved helpers.common name must resolvable, hence, once we load
# child package we attach it to parent module immeadiately.
module.__dict__[child] = AllowedImportsLoader.user_modules[full_name]
return module
# If this module has children load them recursively.
if name in FileAccessRedirector.parents:
for child in FileAccessRedirector.parents[name]:
full_name = name + '.' + child
self.load_module(full_name)
# If we have helpers/common.py package,
# then for it to be successfully resolved helpers.common name
# must resolvable, hence, once we load
# child package we attach it to parent module immeadiately.
module.__dict__[child] = \
AllowedImportsLoader.user_modules[full_name]
return module
class AllowedImportsHandler(object):
def find_module(self, name, path=None): # pylint: disable=unused-argument
filename = AllowedImportsLoader.get_filename(name)
def find_module(self, name, path=None): # pylint: disable=unused-argument
filename = AllowedImportsLoader.get_filename(name)
if filename in FileAccessRedirector.allowed_imports:
return AllowedImportsLoader()
else:
return None
def process_imports(imports):
"""Processes the imports by copying them and adding necessary parent packages.
Copies the imports and then for all the hierarchical packages creates
dummy entries for those parent packages, so that hierarchical imports
can be resolved. In the process parent child relationship map is built.
For example: helpers/extra/common.py will generate helpers, helpers.extra
and helpers.extra.common packages along with related .py files.
Args:
imports: map of files to their relative paths.
Returns:
dictionary of imports to their contents and parent-child pacakge
relationship map.
"""
# First clone all the existing ones.
ret = {}
parents = {}
for k in imports:
ret[k] = imports[k]
# Now build the hierarchical modules.
for k in imports.keys():
path = imports[k]['path']
if path.endswith('.jinja'):
continue
# Normalize paths and trim .py extension, if any.
normalized = os.path.splitext(os.path.normpath(path))[0]
# If this is actually a path and not an absolute name, split it and process
# the hierarchical packages.
if sep in normalized:
parts = normalized.split(sep)
# Create dummy file entries for package levels and also retain
# parent-child relationships.
for i in xrange(0, len(parts)-1):
# Generate the partial package path.
path = os.path.join(parts[0], *parts[1:i+1])
# __init__.py file might have been provided and non-empty by the user.
if path not in ret:
# exec requires at least new line to be present to successfully
# compile the file.
ret[path + '.py'] = '\n'
else:
# To simplify our code, we'll store both versions in that case, since
# loader code expects files with .py extension.
ret[path + '.py'] = ret[path]
# Generate fully qualified package name.
fqpn = '.'.join(parts[0:i+1])
if fqpn in parents:
parents[fqpn].append(parts[i+1])
if filename in FileAccessRedirector.allowed_imports:
return AllowedImportsLoader()
else:
parents[fqpn] = [parts[i+1]]
return ret, parents
return None
class FileAccessRedirector(object):
# Dictionary with user provided imports.
allowed_imports = {}
# Dictionary that shows parent child relationships, key is the parent, value
# is the list of child packages.
parents = {}
@staticmethod
def redirect(imports):
"""Restricts imports and builtin 'open' to the set of user provided imports.
def process_imports(imports):
"""Processes the imports by copying them and adding necessary parent packages.
Imports already available in sys.modules will continue to be available.
Copies the imports and then for all the hierarchical packages creates
dummy entries for those parent packages, so that hierarchical imports
can be resolved. In the process parent child relationship map is built.
For example: helpers/extra/common.py will generate helpers,
helpers.extra and helpers.extra.common packages
along with related .py files.
Args:
imports: map from string to string, the map of imported files names
and contents.
imports: map of files to their relative paths.
Returns:
dictionary of imports to their contents and parent-child pacakge
relationship map.
"""
if imports is not None:
imps, parents = process_imports(imports)
FileAccessRedirector.allowed_imports = imps
FileAccessRedirector.parents = parents
# First clone all the existing ones.
ret = {}
parents = {}
for k in imports:
ret[k] = imports[k]
# Now build the hierarchical modules.
for k in imports.keys():
path = imports[k]['path']
if path.endswith('.jinja'):
continue
# Normalize paths and trim .py extension, if any.
normalized = os.path.splitext(os.path.normpath(path))[0]
# If this is actually a path and not an absolute name,
# split it and process the hierarchical packages.
if sep in normalized:
parts = normalized.split(sep)
# Create dummy file entries for package levels and also retain
# parent-child relationships.
for i in xrange(0, len(parts)-1):
# Generate the partial package path.
path = os.path.join(parts[0], *parts[1:i+1])
# __init__.py file might have been provided and
# non-empty by the user.
if path not in ret:
# exec requires at least new line to be present
# to successfully compile the file.
ret[path + '.py'] = '\n'
else:
# To simplify our code, we'll store both versions
# in that case, since loader code expects files
# with .py extension.
ret[path + '.py'] = ret[path]
# Generate fully qualified package name.
fqpn = '.'.join(parts[0:i+1])
if fqpn in parents:
parents[fqpn].append(parts[i+1])
else:
parents[fqpn] = [parts[i+1]]
return ret, parents
# Prepend our module handler before standard ones.
sys.meta_path = [AllowedImportsHandler()] + sys.meta_path
class FileAccessRedirector(object):
# Dictionary with user provided imports.
allowed_imports = {}
# Dictionary that shows parent child relationships,
# key is the parent, value is the list of child packages.
parents = {}
@staticmethod
def redirect(imports):
"""Restricts imports and builtin 'open' to the set of user provided imports.
Imports already available in sys.modules will continue to be available.
Args:
imports: map from string to string, the map of imported files names
and contents.
"""
if imports is not None:
imps, parents = process_imports(imports)
FileAccessRedirector.allowed_imports = imps
FileAccessRedirector.parents = parents
# Prepend our module handler before standard ones.
sys.meta_path = [AllowedImportsHandler()] + sys.meta_path

@ -4,7 +4,7 @@
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
@ -28,7 +28,7 @@ PROPERTIES = "properties"
# This does not return a complete set of errors; use only for setting defaults.
# Pass this object a schema to get a validator for that schema.
DEFAULT_SETTER = schema_validation_utils.ExtendWithDefault(
jsonschema.Draft4Validator)
jsonschema.Draft4Validator)
# This is a regular validator, use after using the DEFAULT_SETTER
# Pass this object a schema to get a validator for that schema.
@ -37,191 +37,204 @@ VALIDATOR = jsonschema.Draft4Validator
# This is a validator using the default Draft4 metaschema,
# use it to validate user schemas.
SCHEMA_VALIDATOR = jsonschema.Draft4Validator(
jsonschema.Draft4Validator.META_SCHEMA)
jsonschema.Draft4Validator.META_SCHEMA)
# JsonSchema to be used to validate the user's "imports:" section
IMPORT_SCHEMA = """
properties:
imports:
type: array
items:
type: object
required:
- path
properties:
path:
type: string
name:
type: string
additionalProperties: false
uniqueItems: true
properties:
imports:
type: array
items:
type: object
required:
- path
properties:
path:
type: string
name:
type: string
additionalProperties: false
uniqueItems: true
"""
# Validator to be used against the "imports:" section of a schema
IMPORT_SCHEMA_VALIDATOR = jsonschema.Draft4Validator(
yaml.safe_load(IMPORT_SCHEMA))
yaml.safe_load(IMPORT_SCHEMA))
def _ValidateSchema(schema, validating_imports, schema_name, template_name):
"""Validate that the passed in schema file is correctly formatted.
Args:
schema: contents of the schema file
validating_imports: boolean, if we should validate the 'imports'
section of the schema
schema_name: name of the schema file to validate
template_name: name of the template whose properties are being validated
Raises:
ValidationErrors: A list of ValidationError errors that occured when
"""Validate that the passed in schema file is correctly formatted.
Args:
schema:
contents of the schema file
validating_imports:
boolean, if we should validate the 'imports' section of the schema
schema_name:
name of the schema file to validate
template_name:
name of the template whose properties are being validated
Raises:
ValidationErrors: A list of ValidationError errors that occured when
validating the schema file
"""
schema_errors = []
"""
schema_errors = []
# Validate the syntax of the optional "imports:" section of the schema
if validating_imports:
schema_errors.extend(IMPORT_SCHEMA_VALIDATOR.iter_errors(schema))
# Validate the syntax of the optional "imports:" section of the schema
if validating_imports:
schema_errors.extend(IMPORT_SCHEMA_VALIDATOR.iter_errors(schema))
# Validate the syntax of the jsonSchema section of the schema
try:
schema_errors.extend(SCHEMA_VALIDATOR.iter_errors(schema))
except jsonschema.RefResolutionError as e:
# Calls to iter_errors could throw a RefResolution exception
raise ValidationErrors(schema_name, template_name,
[e], is_schema_error=True)
# Validate the syntax of the jsonSchema section of the schema
try:
schema_errors.extend(SCHEMA_VALIDATOR.iter_errors(schema))
except jsonschema.RefResolutionError as e:
# Calls to iter_errors could throw a RefResolution exception
raise ValidationErrors(schema_name, template_name,
[e], is_schema_error=True)
if schema_errors:
raise ValidationErrors(schema_name, template_name,
schema_errors, is_schema_error=True)
if schema_errors:
raise ValidationErrors(schema_name, template_name,
schema_errors, is_schema_error=True)
def Validate(properties, schema_name, template_name, imports):
"""Given a set of properties, validates it against the given schema.
"""Given a set of properties, validates it against the given schema.
Args:
properties: dict, the properties to be validated
schema_name: name of the schema file to validate
template_name: name of the template whose properties are being validated
imports: the map of imported files names to file contents
Args:
properties:
dict, the properties to be validated
schema_name:
name of the schema file to validate
template_name:
name of the template whose properties are being validated
imports:
the map of imported files names to file contents
Returns:
Dict containing the validated properties, with defaults filled in
Returns:
Dict containing the validated properties, with defaults filled in
Raises:
ValidationErrors: A list of ValidationError errors that occurred when
Raises:
ValidationErrors: A list of ValidationError errors that occurred when
validating the properties and schema,
or if the schema file was not found
"""
if schema_name not in imports:
raise ValidationErrors(schema_name, template_name,
["Could not find schema file '%s'." % schema_name])
raw_schema = imports[schema_name]
if properties is None:
properties = {}
schema = yaml.safe_load(raw_schema)
"""
if schema_name not in imports:
raise ValidationErrors(schema_name, template_name,
["Could not find schema file '%s'." %
schema_name])
raw_schema = imports[schema_name]
if properties is None:
properties = {}
schema = yaml.safe_load(raw_schema)
# If the schema is empty, do nothing.
if not schema:
return properties
validating_imports = IMPORTS in schema and schema[IMPORTS]
# If this doesn't raise any exceptions,we can assume we have a valid schema
_ValidateSchema(schema, validating_imports, schema_name, template_name)
errors = []
# Validate that all files specified as "imports:" were included
if validating_imports:
# We have already validated that "imports:"
# is a list of unique "path/name" maps
for import_object in schema[IMPORTS]:
if "name" in import_object:
import_name = import_object["name"]
else:
import_name = import_object["path"]
if import_name not in imports:
errors.append(("File '%s' requested in schema '%s' "
"but not included with imports."
% (import_name, schema_name)))
try:
# This code block uses DEFAULT_SETTER and VALIDATOR for two very
# different purposes.
# DEFAULT_SETTER is based on JSONSchema 4,but uses modified validators:
# - The 'required' validator does nothing
# - The 'properties' validator sets default values on user properties
# With these changes, the validator does not report errors correctly.
#
# So, we do error reporting in two steps:
# 1) Use DEFAULT_SETTER to set default values in the user's properties
# 2) Use the unmodified VALIDATOR to report all of the errors
# Calling iter_errors mutates properties in place,
# adding default values.
# You must call list()! This is a generator, not a function!
list(DEFAULT_SETTER(schema).iter_errors(properties))
# Now that we have default values, validate the properties
errors.extend(list(VALIDATOR(schema).iter_errors(properties)))
if errors:
raise ValidationErrors(schema_name, template_name, errors)
except jsonschema.RefResolutionError as e:
# Calls to iter_errors could throw a RefResolution exception
raise ValidationErrors(schema_name, template_name,
[e], is_schema_error=True)
except TypeError as e:
raise ValidationErrors(
schema_name, template_name,
[e, "Perhaps you forgot to put 'quotes' \
around your reference."],
is_schema_error=True)
# If the schema is empty, do nothing.
if not schema:
return properties
validating_imports = IMPORTS in schema and schema[IMPORTS]
# If this doesn't raise any exceptions, we can assume we have a valid schema
_ValidateSchema(schema, validating_imports, schema_name, template_name)
errors = []
# Validate that all files specified as "imports:" were included
if validating_imports:
# We have already validated that "imports:"
# is a list of unique "path/name" maps
for import_object in schema[IMPORTS]:
if "name" in import_object:
import_name = import_object["name"]
else:
import_name = import_object["path"]
if import_name not in imports:
errors.append(("File '%s' requested in schema '%s' "
"but not included with imports."
% (import_name, schema_name)))
try:
# This code block uses DEFAULT_SETTER and VALIDATOR for two very
# different purposes.
# DEFAULT_SETTER is based on JSONSchema 4, but uses modified validators:
# - The 'required' validator does nothing
# - The 'properties' validator sets default values on user properties
# With these changes, the validator does not report errors correctly.
#
# So, we do error reporting in two steps:
# 1) Use DEFAULT_SETTER to set default values in the user's properties
# 2) Use the unmodified VALIDATOR to report all of the errors
# Calling iter_errors mutates properties in place, adding default values.
# You must call list()! This is a generator, not a function!
list(DEFAULT_SETTER(schema).iter_errors(properties))
# Now that we have default values, validate the properties
errors.extend(list(VALIDATOR(schema).iter_errors(properties)))
if errors:
raise ValidationErrors(schema_name, template_name, errors)
except jsonschema.RefResolutionError as e:
# Calls to iter_errors could throw a RefResolution exception
raise ValidationErrors(schema_name, template_name,
[e], is_schema_error=True)
except TypeError as e:
raise ValidationErrors(
schema_name, template_name,
[e, "Perhaps you forgot to put 'quotes' around your reference."],
is_schema_error=True)
return properties
class ValidationErrors(Exception):
"""Exception raised for errors during validation process.
"""Exception raised for errors during validation process.
The errors could have occured either in the schema xor in the properties
The errors could have occured either in the schema xor in the properties
Attributes:
is_schema_error: Boolean, either an invalid schema, or invalid properties
errors: List of ValidationError type objects
"""
def BuildMessage(self):
"""Builds a human readable message from a list of jsonschema errors.
Returns:
A string in a human readable message format.
Attributes:
is_schema_error: Boolean, either an invalid schema,
or invalid properties
errors: List of ValidationError type objects
"""
if self.is_schema_error:
message = "Invalid schema '%s':\n" % self.schema_name
else:
message = "Invalid properties for '%s':\n" % self.template_name
for error in self.errors:
if isinstance(error, jsonschema.exceptions.ValidationError):
error_message = error.message
location = list(error.path)
if location and len(location):
error_message += " at " + str(location)
# If location is empty the error happened at the root of the schema
else:
error_message = str(error)
message += error_message + "\n"
return message
def __init__(self, schema_name, template_name, errors, is_schema_error=False):
self.schema_name = schema_name
self.template_name = template_name
self.errors = errors
self.is_schema_error = is_schema_error
self.message = self.BuildMessage()
super(ValidationErrors, self).__init__(self.message)
def BuildMessage(self):
"""Builds a human readable message from a list of jsonschema errors.
Returns:
A string in a human readable message format.
"""
if self.is_schema_error:
message = "Invalid schema '%s':\n" % self.schema_name
else:
message = "Invalid properties for '%s':\n" % self.template_name
for error in self.errors:
if isinstance(error, jsonschema.exceptions.ValidationError):
error_message = error.message
location = list(error.path)
if location and len(location):
error_message += " at " + str(location)
# If location is empty the error happened at
# the root of the schema
else:
error_message = str(error)
message += error_message + "\n"
return message
def __init__(self, schema_name, template_name,
errors, is_schema_error=False):
self.schema_name = schema_name
self.template_name = template_name
self.errors = errors
self.is_schema_error = is_schema_error
self.message = self.BuildMessage()
super(ValidationErrors, self).__init__(self.message)

File diff suppressed because it is too large Load Diff

@ -4,7 +4,7 @@
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
@ -23,74 +23,77 @@ REQUIRED = 'required'
def ExtendWithDefault(validator_class):
"""Takes a validator and makes it set default values on properties.
"""Takes a validator and makes it set default values on properties.
Args:
validator_class: A class to add our overridden validators to
Args:
validator_class: A class to add our overridden validators to
Returns:
A validator_class that will set default values and ignore required fields
"""
validate_properties = validator_class.VALIDATORS['properties']
Returns:
A validator_class that will set default values
and ignore required fields
"""
validate_properties = validator_class.VALIDATORS['properties']
def SetDefaultsInProperties(validator, user_schema, user_properties,
parent_schema):
SetDefaults(validator, user_schema or {}, user_properties, parent_schema,
validate_properties)
def SetDefaultsInProperties(validator, user_schema, user_properties,
parent_schema):
SetDefaults(validator, user_schema or {}, user_properties,
parent_schema, validate_properties)
return jsonschema.validators.extend(
validator_class, {PROPERTIES: SetDefaultsInProperties,
REQUIRED: IgnoreKeyword})
return jsonschema.validators.extend(
validator_class, {PROPERTIES: SetDefaultsInProperties,
REQUIRED: IgnoreKeyword})
def SetDefaults(validator, user_schema, user_properties, parent_schema,
validate_properties):
"""Populate the default values of properties.
Args:
validator: A generator that validates the "properties" keyword of the schema
user_schema: Schema which might define defaults, might be a nested part of
the entire schema file.
user_properties: User provided values which we are setting defaults on
parent_schema: Schema object that contains the schema being evaluated on
this pass, user_schema.
validate_properties: Validator function, called recursively.
"""
for schema_property, subschema in user_schema.iteritems():
# The ordering of these conditions assumes that '$ref' blocks override
# all other schema info, which is what the jsonschema library assumes.
# If the subschema has a reference,
# see if that reference defines a 'default' value
if REF in subschema:
out = ResolveReferencedDefault(validator, subschema[REF])
user_properties.setdefault(schema_property, out)
# Otherwise, see if the subschema has a 'default' value
elif DEFAULT in subschema:
user_properties.setdefault(schema_property, subschema[DEFAULT])
# Recursively apply defaults. This is a generator, so we must wrap with list()
list(validate_properties(validator, user_schema,
user_properties, parent_schema))
"""Populate the default values of properties.
Args:
validator: A generator that validates the "properties" keyword
of the schema
user_schema: Schema which might define defaults, might be a nested
part of the entire schema file.
user_properties: User provided values which we are setting defaults on
parent_schema: Schema object that contains the schema being
evaluated on this pass, user_schema.
validate_properties: Validator function, called recursively.
"""
for schema_property, subschema in user_schema.iteritems():
# The ordering of these conditions assumes that '$ref' blocks override
# all other schema info, which is what the jsonschema library assumes.
# If the subschema has a reference,
# see if that reference defines a 'default' value
if REF in subschema:
out = ResolveReferencedDefault(validator, subschema[REF])
user_properties.setdefault(schema_property, out)
# Otherwise, see if the subschema has a 'default' value
elif DEFAULT in subschema:
user_properties.setdefault(schema_property, subschema[DEFAULT])
# Recursively apply defaults. This is a generator, we must wrap with list()
list(validate_properties(validator, user_schema,
user_properties, parent_schema))
def ResolveReferencedDefault(validator, ref):
"""Resolves a reference, and returns any default value it defines.
"""Resolves a reference, and returns any default value it defines.
Args:
validator: A generator that validates the "$ref" keyword
ref: The target of the "$ref" keyword
Args:
validator: A generator that validates the "$ref" keyword
ref: The target of the "$ref" keyword
Returns:
The value of the 'default' field found in the referenced schema, or None
"""
with validator.resolver.resolving(ref) as resolved:
if DEFAULT in resolved:
return resolved[DEFAULT]
Returns:
The value of the 'default' field found in the referenced schema,
or None
"""
with validator.resolver.resolving(ref) as resolved:
if DEFAULT in resolved:
return resolved[DEFAULT]
def IgnoreKeyword(
unused_validator, unused_required, unused_instance, unused_schema):
"""Validator for JsonSchema that does nothing."""
pass
unused_validator, unused_required, unused_instance, unused_schema):
"""Validator for JsonSchema that does nothing."""
pass

Loading…
Cancel
Save