import copy
import json
import logging
import warnings
import yaml
from django.utils.encoding import force_bytes
try:
from swagger_spec_validator.common import SwaggerValidationError as SSVErr
from swagger_spec_validator.validator20 import validate_spec as validate_ssv
except ImportError: # pragma: no cover
validate_ssv = None
try:
from flex.core import parse as validate_flex
from flex.exceptions import ValidationError
except ImportError: # pragma: no cover
validate_flex = None
from . import openapi
from .errors import SwaggerValidationError
logger = logging.getLogger(__name__)
[docs]
def _validate_flex(spec):
try:
validate_flex(spec)
except ValidationError as ex:
raise SwaggerValidationError(str(ex)) from ex
[docs]
def _validate_swagger_spec_validator(spec):
try:
validate_ssv(spec)
except SSVErr as ex:
raise SwaggerValidationError(str(ex)) from ex
#:
VALIDATORS = {
"flex": _validate_flex if validate_flex else lambda s: None,
"ssv": _validate_swagger_spec_validator if validate_ssv else lambda s: None,
}
[docs]
class _OpenAPICodec:
media_type = None
def __init__(self, validators):
self._validators = validators
@property
def validators(self):
"""List of validator names to apply"""
return self._validators
[docs]
def encode(self, document):
"""Transform an :class:`.Swagger` object to a sequence of bytes.
Also performs validation and applies settings.
:param openapi.Swagger document: Swagger spec object as generated by
:class:`.OpenAPISchemaGenerator`
:return: binary encoding of ``document``
:rtype: bytes
"""
if not isinstance(document, openapi.Swagger):
raise TypeError("Expected a `openapi.Swagger` instance")
spec = self.generate_swagger_object(document)
errors = {}
for validator in self.validators:
try:
# validate a deepcopy of the spec to prevent the validator from messing
# with it for example, swagger_spec_validator adds an x-scope property
# to all references
VALIDATORS[validator](copy.deepcopy(spec))
except SwaggerValidationError as e:
errors[validator] = str(e)
if errors:
exc = SwaggerValidationError(
"spec validation failed: {}".format(errors), errors, spec, self
)
logger.warning(str(exc))
raise exc
return force_bytes(self._dump_dict(spec))
[docs]
def encode_error(self, err):
"""Dump an error message into an encoding-appropriate sequence of bytes"""
return force_bytes(self._dump_dict(err))
[docs]
def _dump_dict(self, spec):
"""Dump the given dictionary into its string representation.
:param dict spec: a python dict
:return: string representation of ``spec``
:rtype: str or bytes
"""
raise NotImplementedError("override this method")
[docs]
def generate_swagger_object(self, swagger):
"""Generates the root Swagger object.
:param openapi.Swagger swagger: Swagger spec object as generated by
:class:`.OpenAPISchemaGenerator`
:return: swagger spec as dict
:rtype: dict
"""
return swagger.as_dict()
[docs]
class OpenAPICodecJson(_OpenAPICodec):
media_type = "application/json"
def __init__(self, validators, pretty=False, media_type="application/json"):
super(OpenAPICodecJson, self).__init__(validators)
self.pretty = pretty
self.media_type = media_type
[docs]
def _dump_dict(self, spec):
"""Dump ``spec`` into JSON.
:rtype: str"""
if self.pretty:
return f"{json.dumps(spec, indent=4, separators=(',', ': '), ensure_ascii=False)}\n" # noqa: E501
else:
return json.dumps(spec, ensure_ascii=False)
_YamlDumper = getattr(yaml, "CSafeDumper", yaml.SafeDumper)
YamlLoader = getattr(yaml, "CSafeLoader", yaml.SafeLoader)
[docs]
class YamlDumper(_YamlDumper):
"""YamlDumper class usable for dumping ``dict`` and list instances in a
standard way."""
[docs]
def ignore_aliases(self, data):
"""Disable YAML references."""
return True
[docs]
def represent_text(self, text):
if "\n" in text:
return self.represent_scalar("tag:yaml.org,2002:str", text, style="|")
return self.represent_scalar("tag:yaml.org,2002:str", text)
YamlDumper.add_representer(bytes, YamlDumper.represent_text)
YamlDumper.add_representer(str, YamlDumper.represent_text)
class SaneYamlDumper(YamlDumper):
def __init__(self, *args, **kwargs) -> None:
warnings.warn(
"SaneYamlDumper has been renamed to YamlDumper", DeprecationWarning
)
super().__init__(*args, **kwargs)
[docs]
def yaml_dump(data, binary):
"""Dump the given data dictionary into a format which is easier to read:
* multi-line mapping style instead of json-like inline style
* list elements are indented into their parents
* YAML references/aliases are disabled
:param dict data: the data to be dumped
:param bool binary: True to return a utf-8 encoded binary object, False to return a
string
:return: the serialized YAML
:rtype: str or bytes
"""
return yaml.dump(
data,
Dumper=YamlDumper,
default_flow_style=False,
encoding="utf-8" if binary else None,
allow_unicode=binary,
sort_keys=False,
)
[docs]
def yaml_sane_dump(data, binary):
warnings.warn("yaml_sane_dump has been renamed to yaml_dump", DeprecationWarning)
return yaml_dump(data, binary)
class SaneYamlLoader(YamlLoader):
def __init__(self, *args, **kwargs) -> None:
warnings.warn(
"SaneYamlLoader is deprecated and custom loading is no longer needed. "
"YamlLoader has been exported to easily import CSafeLoader and fall back "
"to SafeLoader if unavailable",
DeprecationWarning,
)
super().__init__(*args, **kwargs)
[docs]
def yaml_load(stream):
"""Load the given YAML stream.
:param stream: YAML stream (can be a string or a file-like object)
:rtype: dict
"""
return yaml.load(stream, Loader=YamlLoader)
[docs]
def yaml_sane_load(stream):
warnings.warn("yaml_sane_load has been renamed to yaml_load", DeprecationWarning)
return yaml_load(stream)
[docs]
class OpenAPICodecYaml(_OpenAPICodec):
media_type = "application/yaml"
def __init__(self, validators, media_type="application/yaml"):
super(OpenAPICodecYaml, self).__init__(validators)
self.media_type = media_type
[docs]
def _dump_dict(self, spec):
"""Dump ``spec`` into YAML.
:rtype: bytes"""
return yaml_dump(spec, binary=True)