Skip to content
Snippets Groups Projects
swagger_model.py 25.3 KiB
Newer Older
  • Learn to ignore specific revisions
  • 
    # Asterisk -- An open source telephony toolkit.
    #
    # Copyright (C) 2013, Digium, Inc.
    #
    # David M. Lee, II <dlee@digium.com>
    #
    # See http://www.asterisk.org for more information about
    # the Asterisk project. Please do not directly contact
    # any of the maintainers of this project for assistance;
    # the project provides a web site, mailing lists and IRC
    # channels for your use.
    #
    # This program is free software, distributed under the terms of
    # the GNU General Public License Version 2. See the LICENSE file
    # at the top of the source tree.
    #
    
    """Swagger data model objects.
    
    These objects should map directly to the Swagger api-docs, without a lot of
    additional fields. In the process of translation, it should also validate the
    model for consistency against the Swagger spec (i.e., fail if fields are
    missing, or have incorrect values).
    
    See https://github.com/wordnik/swagger-core/wiki/API-Declaration for the spec.
    """
    
    import json
    import os.path
    import pprint
    
    # We don't fully support Swagger 1.2, but we need it for subtyping
    SWAGGER_VERSIONS = ["1.1", "1.2"]
    
    SWAGGER_PRIMITIVES = [
        'void',
        'string',
        'boolean',
        'number',
        'int',
        'long',
        'double',
        'float',
        'Date',
    ]
    
    
    class Stringify(object):
        """Simple mix-in to make the repr of the model classes more meaningful.
        """
        def __repr__(self):
            return "%s(%s)" % (self.__class__, pprint.saferepr(self.__dict__))
    
    
    def compare_versions(lhs, rhs):
        '''Performs a lexicographical comparison between two version numbers.
    
        This properly handles simple major.minor.whatever.sure.why.not version
        numbers, but fails miserably if there's any letters in there.
    
        For reference:
          1.0 == 1.0
          1.0 < 1.0.1
          1.2 < 1.10
    
        @param lhs Left hand side of the comparison
        @param rhs Right hand side of the comparison
        @return  < 0 if lhs  < rhs
        @return == 0 if lhs == rhs
        @return  > 0 if lhs  > rhs
        '''
        lhs = [int(v) for v in lhs.split('.')]
        rhs = [int(v) for v in rhs.split('.')]
        return cmp(lhs, rhs)
    
    
    class ParsingContext(object):
        """Context information for parsing.
    
        This object is immutable. To change contexts (like adding an item to the
        stack), use the next() and next_stack() functions to build a new one.
        """
    
        def __init__(self, swagger_version, stack):
            self.__swagger_version = swagger_version
            self.__stack = stack
    
        def __repr__(self):
            return "ParsingContext(swagger_version=%s, stack=%s)" % (
                self.swagger_version, self.stack)
    
        def get_swagger_version(self):
            return self.__swagger_version
    
        def get_stack(self):
            return self.__stack
    
        swagger_version = property(get_swagger_version)
    
        stack = property(get_stack)
    
        def version_less_than(self, ver):
            return compare_versions(self.swagger_version, ver) < 0
    
        def next_stack(self, json, id_field):
            """Returns a new item pushed to the stack.
    
            @param json: Current JSON object.
            @param id_field: Field identifying this object.
            @return New context with additional item in the stack.
            """
            if not id_field in json:
                raise SwaggerError("Missing id_field: %s" % id_field, self)
            new_stack = self.stack + ['%s=%s' % (id_field, str(json[id_field]))]
            return ParsingContext(self.swagger_version, new_stack)
    
        def next(self, version=None, stack=None):
            if version is None:
                version = self.version
            if stack is None:
                stack = self.stack
            return ParsingContext(version, stack)
    
    
    
    class SwaggerError(Exception):
        """Raised when an error is encountered mapping the JSON objects into the
        model.
        """
    
        def __init__(self, msg, context, cause=None):
            """Ctor.
    
            @param msg: String message for the error.
    
            @param context: ParsingContext object
    
            @param cause: Optional exception that caused this one.
            """
            super(Exception, self).__init__(msg, context, cause)
    
    
    class SwaggerPostProcessor(object):
        """Post processing interface for model objects. This processor can add
        fields to model objects for additional information to use in the
        templates.
        """
    
        def process_resource_api(self, resource_api, context):
    
            """Post process a ResourceApi object.
    
            @param resource_api: ResourceApi object.
    
            @param context: Current context in the API.
    
        def process_api(self, api, context):
            """Post process an Api object.
    
            @param api: Api object.
            @param context: Current context in the API.
            """
            pass
    
    
        def process_operation(self, operation, context):
            """Post process a Operation object.
    
            @param operation: Operation object.
    
            @param context: Current context in the API.
    
            """
            pass
    
        def process_parameter(self, parameter, context):
            """Post process a Parameter object.
    
            @param parameter: Parameter object.
    
            @param context: Current context in the API.
    
        def process_model(self, model, context):
            """Post process a Model object.
    
            @param model: Model object.
            @param context: Current context in the API.
            """
            pass
    
        def process_property(self, property, context):
            """Post process a Property object.
    
            @param property: Property object.
            @param context: Current context in the API.
            """
            pass
    
        def process_type(self, swagger_type, context):
            """Post process a SwaggerType object.
    
            @param swagger_type: ResourceListing object.
            @param context: Current context in the API.
            """
            pass
    
        def process_resource_listing(self, resource_listing, context):
            """Post process the overall ResourceListing object.
    
            @param resource_listing: ResourceListing object.
            @param context: Current context in the API.
            """
            pass
    
    
    
    class AllowableRange(Stringify):
        """Model of a allowableValues of type RANGE
    
        See https://github.com/wordnik/swagger-core/wiki/datatypes#complex-types
        """
        def __init__(self, min_value, max_value):
            self.min_value = min_value
            self.max_value = max_value
    
    
        def to_wiki(self):
            return "Allowed range: Min: {0}; Max: {1}".format(self.min_value, self.max_value)
    
    
    
    class AllowableList(Stringify):
        """Model of a allowableValues of type LIST
    
        See https://github.com/wordnik/swagger-core/wiki/datatypes#complex-types
        """
        def __init__(self, values):
            self.values = values
    
    
        def to_wiki(self):
            return "Allowed values: {0}".format(", ".join(self.values))
    
    
    
    def load_allowable_values(json, context):
        """Parse a JSON allowableValues object.
    
        This returns None, AllowableList or AllowableRange, depending on the
        valueType in the JSON. If the valueType is not recognized, a SwaggerError
        is raised.
        """
        if not json:
            return None
    
        if not 'valueType' in json:
            raise SwaggerError("Missing valueType field", context)
    
        value_type = json['valueType']
    
        if value_type == 'RANGE':
    
            if not 'min' in json and not 'max' in json:
                raise SwaggerError("Missing fields min/max", context)
            return AllowableRange(json.get('min'), json.get('max'))
    
        if value_type == 'LIST':
            if not 'values' in json:
                raise SwaggerError("Missing field values", context)
            return AllowableList(json['values'])
        raise SwaggerError("Unkown valueType %s" % value_type, context)
    
    
    class Parameter(Stringify):
        """Model of an operation's parameter.
    
        See https://github.com/wordnik/swagger-core/wiki/parameters
        """
    
        required_fields = ['name', 'paramType', 'dataType']
    
        def __init__(self):
            self.param_type = None
            self.name = None
            self.description = None
            self.data_type = None
            self.required = None
            self.allowable_values = None
            self.allow_multiple = None
    
        def load(self, parameter_json, processor, context):
    
            context = context.next_stack(parameter_json, 'name')
    
            validate_required_fields(parameter_json, self.required_fields, context)
            self.name = parameter_json.get('name')
            self.param_type = parameter_json.get('paramType')
            self.description = parameter_json.get('description') or ''
            self.data_type = parameter_json.get('dataType')
            self.required = parameter_json.get('required') or False
    
            self.default_value = parameter_json.get('defaultValue')
    
            self.allowable_values = load_allowable_values(
                parameter_json.get('allowableValues'), context)
            self.allow_multiple = parameter_json.get('allowMultiple') or False
            processor.process_parameter(self, context)
    
            if parameter_json.get('allowedValues'):
                raise SwaggerError(
                    "Field 'allowedValues' invalid; use 'allowableValues'",
                    context)
    
            return self
    
        def is_type(self, other_type):
            return self.param_type == other_type
    
    
    class ErrorResponse(Stringify):
        """Model of an error response.
    
        See https://github.com/wordnik/swagger-core/wiki/errors
        """
    
        required_fields = ['code', 'reason']
    
        def __init__(self):
            self.code = None
            self.reason = None
    
        def load(self, err_json, processor, context):
    
            context = context.next_stack(err_json, 'code')
    
            validate_required_fields(err_json, self.required_fields, context)
            self.code = err_json.get('code')
            self.reason = err_json.get('reason')
            return self
    
    
    
    class SwaggerType(Stringify):
        """Model of a data type.
        """
    
        def __init__(self):
            self.name = None
            self.is_discriminator = None
            self.is_list = None
            self.singular_name = None
            self.is_primitive = None
    
        def load(self, type_name, processor, context):
            # Some common errors
            if type_name == 'integer':
                raise SwaggerError("The type for integer should be 'int'", context)
    
            self.name = type_name
            type_param = get_list_parameter_type(self.name)
            self.is_list = type_param is not None
            if self.is_list:
                self.singular_name = type_param
            else:
                self.singular_name = self.name
            self.is_primitive = self.singular_name in SWAGGER_PRIMITIVES
            processor.process_type(self, context)
            return self
    
    
    
    class Operation(Stringify):
        """Model of an operation on an API
    
        See https://github.com/wordnik/swagger-core/wiki/API-Declaration#apis
        """
    
        required_fields = ['httpMethod', 'nickname', 'responseClass', 'summary']
    
        def __init__(self):
            self.http_method = None
            self.nickname = None
            self.response_class = None
            self.parameters = []
            self.summary = None
            self.notes = None
            self.error_responses = []
    
        def load(self, op_json, processor, context):
    
            context = context.next_stack(op_json, 'nickname')
    
            validate_required_fields(op_json, self.required_fields, context)
            self.http_method = op_json.get('httpMethod')
            self.nickname = op_json.get('nickname')
    
            response_class = op_json.get('responseClass')
            self.response_class = response_class and SwaggerType().load(
                response_class, processor, context)
    
    
            # Specifying WebSocket URL's is our own extension
            self.is_websocket = op_json.get('upgrade') == 'websocket'
            self.is_req = not self.is_websocket
    
            if self.is_websocket:
                self.websocket_protocol = op_json.get('websocketProtocol')
                if self.http_method != 'GET':
    
                    raise SwaggerError(
                        "upgrade: websocket is only valid on GET operations",
                        context)
    
            params_json = op_json.get('parameters') or []
            self.parameters = [
                Parameter().load(j, processor, context) for j in params_json]
            self.query_parameters = [
                p for p in self.parameters if p.is_type('query')]
            self.has_query_parameters = self.query_parameters and True
            self.path_parameters = [
                p for p in self.parameters if p.is_type('path')]
            self.has_path_parameters = self.path_parameters and True
            self.header_parameters = [
                p for p in self.parameters if p.is_type('header')]
            self.has_header_parameters = self.header_parameters and True
            self.has_parameters = self.has_query_parameters or \
                self.has_path_parameters or self.has_header_parameters
    
    
            # Body param is different, since there's at most one
            self.body_parameter = [
                p for p in self.parameters if p.is_type('body')]
            if len(self.body_parameter) > 1:
                raise SwaggerError("Cannot have more than one body param", context)
            self.body_parameter = self.body_parameter and self.body_parameter[0]
    
            self.has_body_parameter = self.body_parameter and True
    
            self.summary = op_json.get('summary')
            self.notes = op_json.get('notes')
            err_json = op_json.get('errorResponses') or []
            self.error_responses = [
                ErrorResponse().load(j, processor, context) for j in err_json]
    
            self.has_error_responses = self.error_responses != []
    
            processor.process_operation(self, context)
            return self
    
    
    class Api(Stringify):
        """Model of a single API in an API declaration.
    
        See https://github.com/wordnik/swagger-core/wiki/API-Declaration
        """
    
        required_fields = ['path', 'operations']
    
        def __init__(self,):
            self.path = None
            self.description = None
            self.operations = []
    
        def load(self, api_json, processor, context):
    
            context = context.next_stack(api_json, 'path')
    
            validate_required_fields(api_json, self.required_fields, context)
            self.path = api_json.get('path')
            self.description = api_json.get('description')
            op_json = api_json.get('operations')
            self.operations = [
                Operation().load(j, processor, context) for j in op_json]
    
            self.has_websocket = \
                filter(lambda op: op.is_websocket, self.operations) != []
    
            processor.process_api(self, context)
    
    def get_list_parameter_type(type_string):
        """Returns the type parameter if the given type_string is List[].
    
        @param type_string: Type string to parse
        @returns Type parameter of the list, or None if not a List.
        """
        list_match = re.match('^List\[(.*)\]$', type_string)
        return list_match and list_match.group(1)
    
    
    
    class Property(Stringify):
        """Model of a Swagger property.
    
        See https://github.com/wordnik/swagger-core/wiki/datatypes
        """
    
        required_fields = ['type']
    
        def __init__(self, name):
            self.name = name
            self.type = None
            self.description = None
            self.required = None
    
        def load(self, property_json, processor, context):
            validate_required_fields(property_json, self.required_fields, context)
    
            # Bit of a hack, but properties do not self-identify
            context = context.next_stack({'name': self.name}, 'name')
    
            self.description = property_json.get('description') or ''
            self.required = property_json.get('required') or False
    
    
            type = property_json.get('type')
            self.type = type and SwaggerType().load(type, processor, context)
    
            processor.process_property(self, context)
    
            return self
    
    
    class Model(Stringify):
        """Model of a Swagger model.
    
        See https://github.com/wordnik/swagger-core/wiki/datatypes
        """
    
    
        required_fields = ['description', 'properties']
    
    
        def __init__(self):
            self.id = None
    
            self.subtypes = []
            self.__subtype_types = []
    
            self.notes = None
            self.description = None
    
            self.__properties = None
            self.__discriminator = None
    
        def load(self, id, model_json, processor, context):
    
            context = context.next_stack(model_json, 'id')
            validate_required_fields(model_json, self.required_fields, context)
            # The duplication of the model's id is required by the Swagger spec.
    
            self.id = model_json.get('id')
    
            if id != self.id:
    
                raise SwaggerError("Model id doesn't match name", context)
    
            self.subtypes = model_json.get('subTypes') or []
            if self.subtypes and context.version_less_than("1.2"):
                raise SwaggerError("Type extension support added in Swagger 1.2",
    
            self.description = model_json.get('description')
    
            props = model_json.get('properties').items() or []
    
                Property(k).load(j, processor, context) for (k, j) in props]
    
            self.__properties = sorted(self.__properties, key=lambda p: p.name)
    
            discriminator = model_json.get('discriminator')
    
            if discriminator:
    
                if context.version_less_than("1.2"):
                    raise SwaggerError("Discriminator support added in Swagger 1.2",
    
                                       context)
    
                discr_props = [p for p in self.__properties if p.name == discriminator]
                if not discr_props:
                    raise SwaggerError(
                        "Discriminator '%s' does not name a property of '%s'" % (
                            discriminator, self.id),
                        context)
    
                self.__discriminator = discr_props[0]
    
            self.model_json = json.dumps(model_json,
                                         indent=2, separators=(',', ': '))
    
            processor.process_model(self, context)
    
        def extends(self):
            return self.__extends_type and self.__extends_type.id
    
    
        def set_extends_type(self, extends_type):
    
            self.__extends_type = extends_type
    
        def set_subtype_types(self, subtype_types):
            self.__subtype_types = subtype_types
    
    
        def discriminator(self):
            """Returns the discriminator, digging through base types if needed.
            """
            return self.__discriminator or \
    
                self.__extends_type and self.__extends_type.discriminator()
    
            if self.__extends_type:
                base_props = self.__extends_type.properties()
    
            return base_props + self.__properties
    
        def has_properties(self):
            return len(self.properties()) > 0
    
    
        def all_subtypes(self):
            """Returns the full list of all subtypes, including sub-subtypes.
    
            res = self.__subtype_types + \
                  [subsubtypes for subtype in self.__subtype_types
                   for subsubtypes in subtype.all_subtypes()]
    
            return sorted(res, key=lambda m: m.id)
    
        def has_subtypes(self):
            """Returns True if type has any subtypes.
            """
    
            return len(self.subtypes) > 0
    
    
    class ApiDeclaration(Stringify):
        """Model class for an API Declaration.
    
        See https://github.com/wordnik/swagger-core/wiki/API-Declaration
        """
    
        required_fields = [
            'swaggerVersion', '_author', '_copyright', 'apiVersion', 'basePath',
            'resourcePath', 'apis', 'models'
        ]
    
        def __init__(self):
            self.swagger_version = None
            self.author = None
            self.copyright = None
            self.api_version = None
            self.base_path = None
            self.resource_path = None
            self.apis = []
            self.models = []
    
    
        def load_file(self, api_declaration_file, processor):
            context = ParsingContext(None, [api_declaration_file])
    
            try:
                return self.__load_file(api_declaration_file, processor, context)
            except SwaggerError:
                raise
            except Exception as e:
                print >> sys.stderr, "Error: ", traceback.format_exc()
                raise SwaggerError(
                    "Error loading %s" % api_declaration_file, context, e)
    
        def __load_file(self, api_declaration_file, processor, context):
            with open(api_declaration_file) as fp:
                self.load(json.load(fp), processor, context)
    
            expected_resource_path = '/api-docs/' + \
                os.path.basename(api_declaration_file) \
                .replace(".json", ".{format}")
    
            if self.resource_path != expected_resource_path:
    
                print >> sys.stderr, \
                    "%s != %s" % (self.resource_path, expected_resource_path)
    
                raise SwaggerError("resourcePath has incorrect value", context)
    
            return self
    
        def load(self, api_decl_json, processor, context):
            """Loads a resource from a single Swagger resource.json file.
            """
            # If the version doesn't match, all bets are off.
            self.swagger_version = api_decl_json.get('swaggerVersion')
    
            context = context.next(version=self.swagger_version)
            if not self.swagger_version in SWAGGER_VERSIONS:
    
                    "Unsupported Swagger version %s" % self.swagger_version, context)
    
    
            validate_required_fields(api_decl_json, self.required_fields, context)
    
            self.author = api_decl_json.get('_author')
            self.copyright = api_decl_json.get('_copyright')
            self.api_version = api_decl_json.get('apiVersion')
            self.base_path = api_decl_json.get('basePath')
            self.resource_path = api_decl_json.get('resourcePath')
            api_json = api_decl_json.get('apis') or []
            self.apis = [
                Api().load(j, processor, context) for j in api_json]
    
            paths = set()
            for api in self.apis:
                if api.path in paths:
                    raise SwaggerError("API with duplicated path: %s" % api.path, context)
                paths.add(api.path)
    
            self.has_websocket = filter(lambda api: api.has_websocket,
                                        self.apis) == []
    
            models = api_decl_json.get('models').items() or []
    
            self.models = [Model().load(id, json, processor, context)
                           for (id, json) in models]
            self.models = sorted(self.models, key=lambda m: m.id)
            # Now link all base/extended types
            model_dict = dict((m.id, m) for m in self.models)
            for m in self.models:
    
                def link_subtype(name):
                    res = model_dict.get(subtype)
                    if not res:
                        raise SwaggerError("%s has non-existing subtype %s",
                                           m.id, name)
                    res.set_extends_type(m)
                    return res;
                if m.subtypes:
                    m.set_subtype_types([
                        link_subtype(subtype) for subtype in m.subtypes])
    
            return self
    
    
    class ResourceApi(Stringify):
        """Model of an API listing in the resources.json file.
        """
    
        required_fields = ['path', 'description']
    
        def __init__(self):
            self.path = None
            self.description = None
            self.api_declaration = None
    
        def load(self, api_json, processor, context):
    
            context = context.next_stack(api_json, 'path')
    
            validate_required_fields(api_json, self.required_fields, context)
            self.path = api_json['path']
            self.description = api_json['description']
    
            if not self.path or self.path[0] != '/':
                raise SwaggerError("Path must start with /", context)
    
            processor.process_resource_api(self, context)
    
            return self
    
        def load_api_declaration(self, base_dir, processor):
            self.file = (base_dir + self.path).replace('{format}', 'json')
            self.api_declaration = ApiDeclaration().load_file(self.file, processor)
    
            processor.process_resource_api(self, [self.file])
    
    
    
    class ResourceListing(Stringify):
        """Model of Swagger's resources.json file.
        """
    
        required_fields = ['apiVersion', 'basePath', 'apis']
    
        def __init__(self):
            self.swagger_version = None
            self.api_version = None
            self.base_path = None
            self.apis = None
    
        def load_file(self, resource_file, processor):
    
            context = ParsingContext(None, [resource_file])
    
            try:
                return self.__load_file(resource_file, processor, context)
            except SwaggerError:
                raise
            except Exception as e:
                print >> sys.stderr, "Error: ", traceback.format_exc()
                raise SwaggerError(
                    "Error loading %s" % resource_file, context, e)
    
        def __load_file(self, resource_file, processor, context):
            with open(resource_file) as fp:
                return self.load(json.load(fp), processor, context)
    
        def load(self, resources_json, processor, context):
            # If the version doesn't match, all bets are off.
            self.swagger_version = resources_json.get('swaggerVersion')
    
            if not self.swagger_version in SWAGGER_VERSIONS:
    
                raise SwaggerError(
                    "Unsupported Swagger version %s" % swagger_version, context)
    
            validate_required_fields(resources_json, self.required_fields, context)
            self.api_version = resources_json['apiVersion']
            self.base_path = resources_json['basePath']
            apis_json = resources_json['apis']
            self.apis = [
                ResourceApi().load(j, processor, context) for j in apis_json]
    
            processor.process_resource_listing(self, context)
    
            return self
    
    
    def validate_required_fields(json, required_fields, context):
        """Checks a JSON object for a set of required fields.
    
        If any required field is missing, a SwaggerError is raised.
    
        @param json: JSON object to check.
        @param required_fields: List of required fields.
        @param context: Current context in the API.
        """
        missing_fields = [f for f in required_fields if not f in json]
    
        if missing_fields:
            raise SwaggerError(
                "Missing fields: %s" % ', '.join(missing_fields), context)