Add references library that handles outputs, and stop schema validation from validating properties who's value is a reference.

pull/167/head^2^2
Graham Welch 10 years ago
parent 62f1948607
commit 83876d6b69

@ -0,0 +1,221 @@
"""Util to handle references during expansion."""
import jsonpath
import re
# Generally the regex is for this pattern: $(ref.NAME.PATH)
# NOTE: This will find matches when a reference is present, but the 2nd group
# does NOT necessarily match the 'path' in the reference.
REF_PATTERN = re.compile(r'\$\(ref\.(.*?)\.(.*)\)')
# Regex for the beginning of a reference.
REF_PREFIX_PATTERN = re.compile(r'\$\(ref\.')
def HasReference(string):
"""Returns true if the string contains a reference.
We are only looking for the first part of a reference, from there we assume
the user meant to use a reference, and will fail at a later point if no
complete reference is found.
Args:
string: The string to parse to see if it contains '$(ref.'.
Returns:
True if there is at least one reference inside the string.
Raises:
ExpansionReferenceError: If we see '$(ref.' but do not find a complete
reference, we raise this error.
"""
return ReferenceMatcher(string).FindReference()
def _BuildReference(name, path):
"""Takes name and path and returns '$(ref.name.path)'.
Args:
name: String, name of the resource being referenced.
path: String, jsonPath to the value being referenced.
Returns:
String, the complete reference string in the expected format.
"""
return '$(ref.%s.%s)' % (name, path)
def _ExtractWithJsonPath(ref_obj, name, path, raise_exception=True):
"""Given a path and an object, use jsonpath to extract the value.
Args:
ref_obj: Dict obj, the thing being referenced.
name: Name of the resource being referenced, for the error message.
path: Path to follow on the ref_obj to get the desired value.
raise_exception: boolean, set to False and this function will return None
if no value was found at the path instead of throwing an exception.
Returns:
Either the value found at the path, or if raise_exception=False, None.
Raises:
ExpansionReferenceError: if there was a error when evaluation the path, or
no value was found.
"""
try:
result = jsonpath.jsonpath(ref_obj, path)
# jsonpath should either return a list or False
if not isinstance(result, list):
if raise_exception:
raise Exception('No value found.')
return None
# If jsonpath returns a list of a single item, it is lying.
# It really found that item, and put it in a list.
# If the reference is to a list, jsonpath will return a list of a list.
if len(result) == 1:
return result[0]
# But if jsonpath returns a list with multiple elements, the path involved
# wildcards, and the user expects a list of results.
return result
# This will usually be an IndexOutOfBounds error, but not always...
except Exception as e: # pylint: disable=broad-except
if raise_exception:
raise ExpansionReferenceError(_BuildReference(name, path), e.message)
return None
def PopulateReferences(node, output_map):
return _TraverseNode(node, None, output_map)
def _TraverseNode(node, list_references=None, output_map=None):
"""Traverse a dict/list/element to find and resolve references.
Same as DocumentReferenceHandler.java. This function traverses a dictionary
that can contain dicts, lists, and elements.
Args:
node: Object to traverse: dict, list, or string
list_references: If present, we will append all references we find to it.
References will be in the form of a (name, path) tuple.
output_map: Map of resource name to map of output object name to output
value. If present, we will replace references with the values they
reference.
Returns:
The node. If we were provided an output_map, we'll replace references with
the value they reference.
"""
if isinstance(node, dict):
for key in node:
node[key] = _TraverseNode(node[key], list_references, output_map)
elif isinstance(node, list):
for i in range(len(node)):
node[i] = _TraverseNode(node[i], list_references, output_map)
elif isinstance(node, str):
rm = ReferenceMatcher(node)
while rm.FindReference():
if list_references is not None:
list_references.append((rm.name, rm.path))
if output_map is not None:
if rm.name not in output_map:
continue
# It is possible that an output value and real resource share a name.
# In this case, a path could be valid for the real resource but not the
# output. So we don't fail, we let the reference exists as is.
value = _ExtractWithJsonPath(output_map[rm.name], rm.name, rm.path,
raise_exception=True)
if value is not None:
node = node.replace(_BuildReference(rm.name, rm.path), value)
return node
class ReferenceMatcher(object):
"""Finds and extracts references from strings.
Same as DocumentReferenceHandler.java. This class is meant to be similar to
the re2 matcher class, but specifically tuned for references.
"""
content = None
name = None
path = None
def __init__(self, content):
self.content = content
def FindReference(self):
"""Returns True if the string contains a reference and saves it.
Returns:
True if the content still contains a reference. If so, it also updates the
name and path values to that of the most recently found reference. At the
same time it moves the pointer in the content forward to be ready to find
the next reference.
Raises:
ExpansionReferenceError: If we see '$(ref.' but do not find a complete
reference, we raise this error.
"""
# First see if the content contains '$(ref.'
if not REF_PREFIX_PATTERN.search(self.content):
return False
# If so, then we say there is a reference here.
# Next make sure we find NAME and some PATH with the close paren
match = REF_PATTERN.search(self.content)
if not match:
# Has '$(ref.' but not '$(ref.NAME.PATH)'
raise ExpansionReferenceError(self.content, 'Malformed reference.')
# The regex matcher can only tell us that a complete reference exists.
# We need to count parentheses to find the end of the reference.
# Consider "$(ref.NAME.path())())" which is a string containing a reference
# and ending with "())"
open_group = 1 # Count the first '(' in '$(ref...'
end_ref = 0 # To hold the position of the end of the reference
end_name = match.end(1) # The position of the end of the name
# Iterate through the path until we find the matching close paren to the
# open paren that started the reference.
for i in xrange(end_name, len(self.content)):
c = self.content[i]
if c == '(':
open_group += 1
elif c == ')':
open_group -= 1
# Once we have matched all of our open parens, we have found the end.
if open_group == 0:
end_ref = i
break
if open_group != 0:
# There are unmatched parens.
raise ExpansionReferenceError(self.content, 'Malformed reference.')
# Save the name
self.name = match.group(1)
# Skip the period after name, and save the path
self.path = self.content[end_name + 1: end_ref]
# Move the content forward to be ready to find the next reference
self.content = self.content[end_ref:]
return True
class ExpansionReferenceError(Exception):
"""Exception raised when jsonPath cannot find the referenced value.
Attributes:
reference: the reference processed that results in the error
message: the detailed message of the error
"""
def __init__(self, reference, message):
self.reference = reference
self.message = message + ' Reference: ' + str(reference)
super(ExpansionReferenceError, self).__init__(self.message)

@ -0,0 +1,274 @@
######################################################################
# Copyright 2015 The Kubernetes Authors All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
######################################################################
"""Basic unit tests for template expansion references library."""
import unittest
from references import _ExtractWithJsonPath
from references import _TraverseNode
from references import ExpansionReferenceError
from references import HasReference
from references import ReferenceMatcher
class ReferencesTest(unittest.TestCase):
# Tests for HasReference
def testBasicReference(self):
self.assertTrue(HasReference('$(ref.name.path)'))
def testEmbeddedReference(self):
self.assertTrue(HasReference('contains reference $(ref.name.path) EOM'))
def testComplexPath(self):
self.assertTrue(HasReference('$(ref.name.path[0].to().very.cool["thing"])'))
def testComplexName(self):
self.assertTrue(HasReference('$(ref.name-is-superCool.path)'))
def testMissingGroupClose(self):
try:
HasReference('almost a reference $(ref.name.path')
self.Fail('Expected Reference exception')
except ExpansionReferenceError as e:
self.assertTrue('Malformed' in e.message)
self.assertTrue('$(ref.name.path' in e.message)
def testMissingGroupOpen(self):
# Not close enough to find a match
self.assertFalse(HasReference('almost a reference $ref.name.path)'))
def testMissingPath(self):
try:
self.assertTrue(HasReference('almost a reference $(ref.name)'))
self.Fail('Expected Reference exception')
except ExpansionReferenceError as e:
self.assertTrue('Malformed' in e.message)
self.assertTrue('$(ref.name)' in e.message)
def testUnmatchedParens(self):
try:
self.assertTrue(HasReference('almost a reference $(ref.name.path()'))
self.Fail('Expected Reference exception')
except ExpansionReferenceError as e:
self.assertTrue('Malformed' in e.message)
self.assertTrue('$(ref.name.path()' in e.message)
def testMissingRef(self):
self.assertFalse(HasReference('almost a reference $(name.path)'))
# Test for ReferenceMatcher
def testMatchBasic(self):
matcher = ReferenceMatcher('$(ref.NAME.PATH)')
self.assertTrue(matcher.FindReference())
self.assertEquals(matcher.name, 'NAME')
self.assertEquals(matcher.path, 'PATH')
self.assertFalse(matcher.FindReference())
def testMatchComplexPath(self):
matcher = ReferenceMatcher('inside a $(ref.NAME.path[?(@.price<10)].val)!')
self.assertTrue(matcher.FindReference())
self.assertEquals(matcher.name, 'NAME')
self.assertEquals(matcher.path, 'path[?(@.price<10)].val')
self.assertFalse(matcher.FindReference())
def testMatchInString(self):
matcher = ReferenceMatcher('inside a $(ref.NAME.PATH) string')
self.assertTrue(matcher.FindReference())
self.assertEquals(matcher.name, 'NAME')
self.assertEquals(matcher.path, 'PATH')
self.assertFalse(matcher.FindReference())
def testMatchTwo(self):
matcher = ReferenceMatcher('two $(ref.NAME1.PATH1) inside '
'a $(ref.NAME2.PATH2) string')
self.assertTrue(matcher.FindReference())
self.assertEquals(matcher.name, 'NAME1')
self.assertEquals(matcher.path, 'PATH1')
self.assertTrue(matcher.FindReference())
self.assertEquals(matcher.name, 'NAME2')
self.assertEquals(matcher.path, 'PATH2')
self.assertFalse(matcher.FindReference())
def testMatchGoodAndBad(self):
matcher = ReferenceMatcher('$(ref.NAME.PATH) good and $(ref.NAME.PATH bad')
self.assertTrue(matcher.FindReference())
self.assertEquals(matcher.name, 'NAME')
self.assertEquals(matcher.path, 'PATH')
try:
matcher.FindReference()
self.Fail('Expected Reference exception')
except ExpansionReferenceError as e:
self.assertTrue('Malformed' in e.message)
self.assertTrue('$(ref.NAME.PATH bad' in e.message)
def testAlmostMatch(self):
matcher = ReferenceMatcher('inside a $(ref.NAME.PATH with no close paren')
try:
matcher.FindReference()
self.Fail('Expected Reference exception')
except ExpansionReferenceError as e:
self.assertTrue('Malformed' in e.message)
self.assertTrue('$(ref.NAME.PATH ' in e.message)
# Tests for _TraverseNode
def testFindAllReferences(self):
ref_list = []
node = {'a': ['a $(ref.name1.path1) string',
'$(ref.name2.path2)',
123,],
'b': {'a1': 'another $(ref.name3.path3) string',},
'c': 'yet another $(ref.name4.path4) string',}
traversed_node = _TraverseNode(node, ref_list, None)
self.assertEquals(node, traversed_node)
self.assertEquals(4, len(ref_list))
self.assertTrue(('name1', 'path1') in ref_list)
self.assertTrue(('name2', 'path2') in ref_list)
self.assertTrue(('name3', 'path3') in ref_list)
self.assertTrue(('name4', 'path4') in ref_list)
def testReplaceReference(self):
ref_map = {'name1': {'path1a': '1a',
'path1b': '1b',},
'name2': {'path2a': '2a',},}
node = {'a': ['a $(ref.name1.path1a) string',
'$(ref.name2.path2a)',
123,],
'b': {'a1': 'another $(ref.name1.path1b) string',},
'c': 'yet another $(ref.name2.path2a)$(ref.name2.path2a) string',}
expt = {'a': ['a 1a string',
'2a',
123,],
'b': {'a1': 'another 1b string',},
'c': 'yet another 2a2a string',}
self.assertEquals(expt, _TraverseNode(node, None, ref_map))
def testReplaceNotFoundReferencePath(self):
ref_map = {'name1': {'path1a': '1a',},}
node = {'a': ['a $(ref.name1.path1a) string',
'b $(ref.name1.path1b)',
'c $(ref.name2.path2a)'],}
try:
_TraverseNode(node, None, ref_map)
self.Fail('Expected ExpansionReferenceError')
except ExpansionReferenceError as e:
self.assertTrue('No value found' in e.message)
self.assertTrue('$(ref.name1.path1b)' in e.message)
def testReplaceNotFoundReferenceName(self):
ref_map = {'name1': {'path1a': '1a',},}
node = {'a': ['a $(ref.name1.path1a) string',
'c $(ref.name2.path2a)'],}
expt = {'a': ['a 1a string',
'c $(ref.name2.path2a)'],}
self.assertEquals(expt, _TraverseNode(node, None, ref_map))
# Tests for _ExtractWithJsonPath
def testExtractFromList(self):
ref_map = {'a': ['one', 'two', 'three',],}
self.assertEquals('two', _ExtractWithJsonPath(ref_map, 'foo', 'a[1]'))
def testExtractFromMap(self):
ref_map = {'a': {'b': {'c': 'd'}}}
self.assertEquals('d', _ExtractWithJsonPath(ref_map, 'foo', 'a.b.c'))
def testExtractList(self):
ref_map = {'a': ['one', 'two', 'three',],}
self.assertEquals(['one', 'two', 'three',],
_ExtractWithJsonPath(ref_map, 'foo', 'a'))
def testExtractListOfSingleItem(self):
ref_map = {'a': ['one'],}
self.assertEquals(['one'], _ExtractWithJsonPath(ref_map, 'foo', 'a'))
def testExtractListWithWildcard(self):
ref_map = {'a': ['one', 'two', 'three',],}
self.assertEquals(['one', 'two', 'three',],
_ExtractWithJsonPath(ref_map, 'foo', 'a[*]'))
def testExtractMap(self):
ref_map = {'a': {'b': {'c': 'd'}}}
self.assertEquals({'c': 'd'}, _ExtractWithJsonPath(ref_map, 'foo', 'a.b'))
def testExtractFalse(self):
ref_map = {'a': False}
self.assertEquals(False, _ExtractWithJsonPath(ref_map, 'foo', 'a'))
def testExtractFail_BadIndex(self):
ref_map = {'a': ['one', 'two', 'three',],}
# IndexError
try:
_ExtractWithJsonPath(ref_map, 'foo', 'a[3]')
self.fail('Expected Reference error')
except ExpansionReferenceError as e:
self.assertTrue('foo.a[3]' in e.message)
self.assertTrue('index out of range' in e.message)
self.assertFalse(_ExtractWithJsonPath(ref_map, 'foo', 'a[3]',
raise_exception=False))
def testExtractFail_NotAList(self):
ref_map = {'a': {'b': {'c': 'd'}}}
try:
_ExtractWithJsonPath(ref_map, 'foo', 'a.b[0]')
self.fail('Expected Reference error')
except ExpansionReferenceError as e:
self.assertTrue('foo.a.b[0]' in e.message)
self.assertTrue('No value found.' in e.message)
self.assertFalse(_ExtractWithJsonPath(ref_map, 'foo', 'a.b[0]',
raise_exception=False))
def testExtractFail_BadKey(self):
ref_map = {'a': {'b': {'c': 'd'}}}
self.assertFalse(_ExtractWithJsonPath(ref_map, 'foo', 'a.b.d',
raise_exception=False))
def testExtractFail_NoObject(self):
ref_map = {'a': {'b': {'c': 'd'}}}
self.assertFalse(_ExtractWithJsonPath(ref_map, 'foo', 'a.b.c.d',
raise_exception=False))
def testExtractFail_MalformedPath(self):
ref_map = {'a': {'b': {'c': 'd'}}}
self.assertFalse(_ExtractWithJsonPath(ref_map, 'foo', 'a.b[2',
raise_exception=False))
if __name__ == '__main__':
unittest.main()

@ -17,6 +17,7 @@
import jsonschema
import yaml
import references
import schema_validation_utils
@ -61,6 +62,12 @@ IMPORT_SCHEMA_VALIDATOR = jsonschema.Draft4Validator(
yaml.safe_load(IMPORT_SCHEMA))
def _FilterReferences(error_generator):
for error in error_generator:
if not references.HasReference(str(error.instance)):
yield error
def _ValidateSchema(schema, validating_imports, schema_name, template_name):
"""Validate that the passed in schema file is correctly formatted.
@ -165,7 +172,7 @@ def Validate(properties, schema_name, template_name, imports):
list(DEFAULT_SETTER(schema).iter_errors(properties))
# Now that we have default values, validate the properties
errors.extend(list(VALIDATOR(schema).iter_errors(properties)))
errors.extend(_FilterReferences(VALIDATOR(schema).iter_errors(properties)))
if errors:
raise ValidationErrors(schema_name, template_name, errors)

@ -613,5 +613,54 @@ class SchemaValidationTest(unittest.TestCase):
self.assertIn("is not of type 'array' at ['imports']", e.message)
self.assertIn("is not of type u'array' at [u'required']", e.message)
def testNoValidateReference_Simple(self):
schema = """
properties:
number:
type: integer
"""
properties = """
number: $(ref.foo.size)
"""
self.assertEquals(yaml.safe_load(properties),
RawValidate(properties, 'schema', schema))
def testNoValidateReference_OtherErrorNotFiltered(self):
schema = """
properties:
number:
type: integer
also-number:
type: integer
"""
properties = """
number: $(ref.foo.size)
also-number: not a number
"""
try:
RawValidate(properties, 'schema', schema)
self.fail('Validation should fail')
except schema_validation.ValidationErrors as e:
self.assertEquals(1, len(e.errors))
def testNoValidateReference_NestedError(self):
schema_name = 'nested_objects.py.schema'
schema = ReadTestFile(schema_name)
properties = """
one:
name: my-database
size: $(ref.other-database.size)
two:
name: other-database
size: really big
"""
try:
RawValidate(properties, schema_name, schema)
self.fail('Validation should fail')
except schema_validation.ValidationErrors as e:
self.assertEqual(1, len(e.errors))
self.assertIn("is not of type 'integer' at ['two', 'size']", e.message)
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,20 @@
info:
title: Schema with properties that are themselves objects
imports:
properties:
one:
type: object
properties:
name:
type: string
size:
type: integer
two:
type: object
properties:
name:
type: string
size:
type: integer
Loading…
Cancel
Save