1
1
"""OpenAPI core validation validators module"""
2
+ import re
2
3
from functools import cached_property
3
4
from typing import Any
4
5
from typing import Mapping
23
24
)
24
25
from openapi_core .protocols import Request
25
26
from openapi_core .protocols import WebhookRequest
26
- from openapi_core .schema .parameters import get_value
27
+ from openapi_core .schema .parameters import get_aslist
28
+ from openapi_core .schema .parameters import get_deep_object_value
29
+ from openapi_core .schema .parameters import get_explode
30
+ from openapi_core .schema .parameters import get_style
31
+ from openapi_core .schema .protocols import SuportsGetAll
32
+ from openapi_core .schema .protocols import SuportsGetList
27
33
from openapi_core .spec import Spec
28
34
from openapi_core .templating .media_types .datatypes import MediaType
29
35
from openapi_core .templating .paths .datatypes import PathOperationServer
@@ -70,10 +76,14 @@ def __init__(
70
76
self .extra_format_validators = extra_format_validators
71
77
self .extra_media_type_deserializers = extra_media_type_deserializers
72
78
73
- def _get_media_type (self , content : Spec , mimetype : str ) -> MediaType :
79
+ def _find_media_type (
80
+ self , content : Spec , mimetype : Optional [str ] = None
81
+ ) -> MediaType :
74
82
from openapi_core .templating .media_types .finders import MediaTypeFinder
75
83
76
84
finder = MediaTypeFinder (content )
85
+ if mimetype is None :
86
+ return finder .get_first ()
77
87
return finder .find (mimetype )
78
88
79
89
def _deserialise_media_type (self , mimetype : str , value : Any ) -> Any :
@@ -99,69 +109,93 @@ def _validate_schema(self, schema: Spec, value: Any) -> None:
99
109
)
100
110
validator .validate (value )
101
111
102
- def _get_param_or_header_value (
112
+ def _get_param_or_header (
103
113
self ,
104
114
param_or_header : Spec ,
105
115
location : Mapping [str , Any ],
106
116
name : Optional [str ] = None ,
107
117
) -> Any :
108
- casted , schema = self ._get_param_or_header_value_and_schema (
109
- param_or_header , location , name
118
+ # Simple scenario
119
+ if "content" not in param_or_header :
120
+ return self ._get_simple_param_or_header (
121
+ param_or_header , location , name = name
122
+ )
123
+
124
+ # Complex scenario
125
+ return self ._get_complex_param_or_header (
126
+ param_or_header , location , name = name
127
+ )
128
+
129
+ def _get_simple_param_or_header (
130
+ self ,
131
+ param_or_header : Spec ,
132
+ location : Mapping [str , Any ],
133
+ name : Optional [str ] = None ,
134
+ ) -> Any :
135
+ try :
136
+ raw = self ._get_style_value (param_or_header , location , name = name )
137
+ except KeyError :
138
+ # in simple scenrios schema always exist
139
+ schema = param_or_header / "schema"
140
+ if "default" not in schema :
141
+ raise
142
+ raw = schema ["default" ]
143
+ return self ._convert_schema_style_value (raw , param_or_header )
144
+
145
+ def _get_complex_param_or_header (
146
+ self ,
147
+ param_or_header : Spec ,
148
+ location : Mapping [str , Any ],
149
+ name : Optional [str ] = None ,
150
+ ) -> Any :
151
+ content = param_or_header / "content"
152
+ # no point to catch KetError
153
+ # in complex scenrios schema doesn't exist
154
+ raw = self ._get_media_type_value (param_or_header , location , name = name )
155
+ return self ._convert_content_schema_value (raw , content )
156
+
157
+ def _convert_schema_style_value (
158
+ self ,
159
+ raw : Any ,
160
+ param_or_header : Spec ,
161
+ ) -> Any :
162
+ casted , schema = self ._convert_schema_style_value_and_schema (
163
+ raw , param_or_header
110
164
)
111
165
if schema is None :
112
166
return casted
113
167
self ._validate_schema (schema , casted )
114
168
return casted
115
169
116
- def _get_content_value (
117
- self , raw : Any , mimetype : str , content : Spec
170
+ def _convert_content_schema_value (
171
+ self , raw : Any , content : Spec , mimetype : Optional [ str ] = None
118
172
) -> Any :
119
- casted , schema = self ._get_content_value_and_schema (
120
- raw , mimetype , content
173
+ casted , schema = self ._convert_content_schema_value_and_schema (
174
+ raw , content , mimetype
121
175
)
122
176
if schema is None :
123
177
return casted
124
178
self ._validate_schema (schema , casted )
125
179
return casted
126
180
127
- def _get_param_or_header_value_and_schema (
181
+ def _convert_schema_style_value_and_schema (
128
182
self ,
183
+ raw : Any ,
129
184
param_or_header : Spec ,
130
- location : Mapping [str , Any ],
131
- name : Optional [str ] = None ,
132
185
) -> Tuple [Any , Spec ]:
133
- try :
134
- raw_value = get_value (param_or_header , location , name = name )
135
- except KeyError :
136
- if "schema" not in param_or_header :
137
- raise
138
- schema = param_or_header / "schema"
139
- if "default" not in schema :
140
- raise
141
- casted = schema ["default" ]
142
- else :
143
- # Simple scenario
144
- if "content" not in param_or_header :
145
- deserialised = self ._deserialise_style (
146
- param_or_header , raw_value
147
- )
148
- schema = param_or_header / "schema"
149
- # Complex scenario
150
- else :
151
- content = param_or_header / "content"
152
- mimetype , media_type = next (content .items ())
153
- deserialised = self ._deserialise_media_type (
154
- mimetype , raw_value
155
- )
156
- schema = media_type / "schema"
157
- casted = self ._cast (schema , deserialised )
186
+ deserialised = self ._deserialise_style (param_or_header , raw )
187
+ schema = param_or_header / "schema"
188
+ casted = self ._cast (schema , deserialised )
158
189
return casted , schema
159
190
160
- def _get_content_value_and_schema (
161
- self , raw : Any , mimetype : str , content : Spec
191
+ def _convert_content_schema_value_and_schema (
192
+ self ,
193
+ raw : Any ,
194
+ content : Spec ,
195
+ mimetype : Optional [str ] = None ,
162
196
) -> Tuple [Any , Optional [Spec ]]:
163
- media_type , mimetype = self ._get_media_type (content , mimetype )
164
- deserialised = self ._deserialise_media_type (mimetype , raw )
197
+ media_type , mime_type = self ._find_media_type (content , mimetype )
198
+ deserialised = self ._deserialise_media_type (mime_type , raw )
165
199
casted = self ._cast (media_type , deserialised )
166
200
167
201
if "schema" not in media_type :
@@ -170,6 +204,45 @@ def _get_content_value_and_schema(
170
204
schema = media_type / "schema"
171
205
return casted , schema
172
206
207
+ def _get_style_value (
208
+ self ,
209
+ param_or_header : Spec ,
210
+ location : Mapping [str , Any ],
211
+ name : Optional [str ] = None ,
212
+ ) -> Any :
213
+ name = name or param_or_header ["name" ]
214
+ style = get_style (param_or_header )
215
+ if name not in location :
216
+ # Only check if the name is not in the location if the style of
217
+ # the param is deepObject,this is because deepObjects will never be found
218
+ # as their key also includes the properties of the object already.
219
+ if style != "deepObject" :
220
+ raise KeyError
221
+ keys_str = " " .join (location .keys ())
222
+ if not re .search (rf"{ name } \[\w+\]" , keys_str ):
223
+ raise KeyError
224
+
225
+ aslist = get_aslist (param_or_header )
226
+ explode = get_explode (param_or_header )
227
+ if aslist and explode :
228
+ if style == "deepObject" :
229
+ return get_deep_object_value (location , name )
230
+ if isinstance (location , SuportsGetAll ):
231
+ return location .getall (name )
232
+ if isinstance (location , SuportsGetList ):
233
+ return location .getlist (name )
234
+
235
+ return location [name ]
236
+
237
+ def _get_media_type_value (
238
+ self ,
239
+ param_or_header : Spec ,
240
+ location : Mapping [str , Any ],
241
+ name : Optional [str ] = None ,
242
+ ) -> Any :
243
+ name = name or param_or_header ["name" ]
244
+ return location [name ]
245
+
173
246
174
247
class BaseAPICallValidator (BaseValidator ):
175
248
@cached_property
0 commit comments