Skip to content

Commit 3a7d3a6

Browse files
authored
Add MODELSTORE and MODELEXECUTE (#60)
* Add the support in the new commands: AI.MODELEXECUTE (AI.MODELRUN is now deprecated) and AI.MODELSTORE (AI.MODELSET is now deprecated). * Add deprecated to requirements * Formatting * PR fixes, among them: bring back the documentation for deprecated modelset and modelrun. Validate that all required arguments appear in the new commands (+test it) * Make fixes in tests to suit the change in "AI.modelget"
1 parent 6486d6e commit 3a7d3a6

9 files changed

+422
-81
lines changed

redisai/client.py

+133-6
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
1-
from functools import wraps, partial
2-
from typing import Union, AnyStr, ByteString, List, Sequence, Any
31
import warnings
2+
from functools import partial, wraps
3+
from typing import Any, AnyStr, ByteString, List, Sequence, Union
44

5-
from redis import StrictRedis
65
import numpy as np
6+
from deprecated import deprecated
7+
from redis import StrictRedis
78

89
from redisai import command_builder as builder
910
from redisai.dag import Dag
1011
from redisai.pipeline import Pipeline
1112
from redisai.postprocessor import Processor
1213

13-
1414
processor = Processor()
1515

1616

@@ -96,7 +96,7 @@ def dag(
9696
-------
9797
>>> con.tensorset('tensor', ...)
9898
'OK'
99-
>>> con.modelset('model', ...)
99+
>>> con.modelstore('model', ...)
100100
'OK'
101101
>>> dag = con.dag(load=['tensor'], persist=['output'])
102102
>>> dag.tensorset('another', ...)
@@ -136,6 +136,83 @@ def loadbackend(self, identifier: AnyStr, path: AnyStr) -> str:
136136
res = self.execute_command(*args)
137137
return res if not self.enable_postprocess else processor.loadbackend(res)
138138

139+
def modelstore(
140+
self,
141+
key: AnyStr,
142+
backend: str,
143+
device: str,
144+
data: ByteString,
145+
batch: int = None,
146+
minbatch: int = None,
147+
minbatchtimeout: int = None,
148+
tag: AnyStr = None,
149+
inputs: Union[AnyStr, List[AnyStr]] = None,
150+
outputs: Union[AnyStr, List[AnyStr]] = None,
151+
) -> str:
152+
"""
153+
Set the model on provided key.
154+
155+
Parameters
156+
----------
157+
key : AnyStr
158+
Key name
159+
backend : str
160+
Backend name. Allowed backends are TF, TORCH, TFLITE, ONNX
161+
device : str
162+
Device name. Allowed devices are CPU and GPU. If multiple GPUs are available,
163+
it can be specified using the format GPU:<gpu number>. For example: GPU:0
164+
data : bytes
165+
Model graph read as bytes string
166+
batch : int
167+
Number of batches for doing auto-batching
168+
minbatch : int
169+
Minimum number of samples required in a batch for model execution
170+
minbatchtimeout : int
171+
The max number of miliseconds for which the engine will not trigger an execution
172+
if the number of samples is lower than minbatch (after minbatchtimeout is passed,
173+
the execution will start even if minbatch jas not reached)
174+
tag : AnyStr
175+
Any string that will be saved in RedisAI as tag for the model
176+
inputs : Union[AnyStr, List[AnyStr]]
177+
Input node(s) in the graph. Required only Tensorflow graphs
178+
outputs : Union[AnyStr, List[AnyStr]]
179+
Output node(s) in the graph Required only for Tensorflow graphs
180+
181+
Returns
182+
-------
183+
str
184+
'OK' if success, raise an exception otherwise
185+
186+
Example
187+
-------
188+
>>> # Torch model
189+
>>> model_path = os.path.join('path/to/TorchScriptModel.pt')
190+
>>> model = open(model_path, 'rb').read()
191+
>>> con.modeltore("model", 'torch', 'cpu', model, tag='v1.0')
192+
'OK'
193+
>>> # Tensorflow model
194+
>>> model_path = os.path.join('/path/to/tf_frozen_graph.pb')
195+
>>> model = open(model_path, 'rb').read()
196+
>>> con.modelstore('m', 'tf', 'cpu', model,
197+
... inputs=['a', 'b'], outputs=['mul'], tag='v1.0')
198+
'OK'
199+
"""
200+
args = builder.modelstore(
201+
key,
202+
backend,
203+
device,
204+
data,
205+
batch,
206+
minbatch,
207+
minbatchtimeout,
208+
tag,
209+
inputs,
210+
outputs,
211+
)
212+
res = self.execute_command(*args)
213+
return res if not self.enable_postprocess else processor.modelstore(res)
214+
215+
@deprecated(version="1.2.0", reason="Use modelstore instead")
139216
def modelset(
140217
self,
141218
key: AnyStr,
@@ -247,6 +324,56 @@ def modeldel(self, key: AnyStr) -> str:
247324
res = self.execute_command(*args)
248325
return res if not self.enable_postprocess else processor.modeldel(res)
249326

327+
def modelexecute(
328+
self,
329+
key: AnyStr,
330+
inputs: Union[AnyStr, List[AnyStr]],
331+
outputs: Union[AnyStr, List[AnyStr]],
332+
timeout: int = None,
333+
) -> str:
334+
"""
335+
Run the model using input(s) which are already in the scope and are associated
336+
to some keys. Modelexecute also needs the output key name(s) to store the output
337+
from the model. The number of outputs from the model and the number of keys
338+
provided here must be same. Otherwise, RedisAI throws an error
339+
340+
Parameters
341+
----------
342+
key : str
343+
Model key to run
344+
inputs : Union[AnyStr, List[AnyStr]]
345+
Tensor(s) which is already saved in the RedisAI using a tensorset call. These
346+
tensors will be used as the inputs for the modelexecute
347+
outputs : Union[AnyStr, List[AnyStr]]
348+
keys on which the outputs to be saved. If those keys exist already,
349+
modelexecute will overwrite them with new values
350+
timeout : int
351+
The max number on milisecinds that may pass before the request is prossced
352+
(meaning that the result will not be computed after that time and TIMEDOUT
353+
is returned in that case
354+
355+
Returns
356+
-------
357+
str
358+
'OK' if success, raise an exception otherwise
359+
360+
Example
361+
-------
362+
>>> con.modelstore('m', 'tf', 'cpu', model_pb,
363+
... inputs=['a', 'b'], outputs=['mul'], tag='v1.0')
364+
'OK'
365+
>>> con.tensorset('a', (2, 3), dtype='float')
366+
'OK'
367+
>>> con.tensorset('b', (2, 3), dtype='float')
368+
'OK'
369+
>>> con.modelexecute('m', ['a', 'b'], ['c'])
370+
'OK'
371+
"""
372+
args = builder.modelexecute(key, inputs, outputs, timeout)
373+
res = self.execute_command(*args)
374+
return res if not self.enable_postprocess else processor.modelexecute(res)
375+
376+
@deprecated(version="1.2.0", reason="Use modelexecute instead")
250377
def modelrun(
251378
self,
252379
key: AnyStr,
@@ -277,7 +404,7 @@ def modelrun(
277404
278405
Example
279406
-------
280-
>>> con.modelset('m', 'tf', 'cpu', model_pb,
407+
>>> con.modelstore('m', 'tf', 'cpu', model_pb,
281408
... inputs=['a', 'b'], outputs=['mul'], tag='v1.0')
282409
'OK'
283410
>>> con.tensorset('a', (2, 3), dtype='float')

redisai/command_builder.py

+94-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
from typing import Union, AnyStr, ByteString, List, Sequence
1+
from typing import AnyStr, ByteString, List, Sequence, Union
2+
23
import numpy as np
4+
35
from . import utils
46

57
# TODO: mypy check
@@ -9,29 +11,90 @@ def loadbackend(identifier: AnyStr, path: AnyStr) -> Sequence:
911
return "AI.CONFIG LOADBACKEND", identifier, path
1012

1113

12-
def modelset(
14+
def modelstore(
1315
name: AnyStr,
1416
backend: str,
1517
device: str,
1618
data: ByteString,
1719
batch: int,
1820
minbatch: int,
21+
minbatchtimeout: int,
1922
tag: AnyStr,
2023
inputs: Union[AnyStr, List[AnyStr]],
2124
outputs: Union[AnyStr, List[AnyStr]],
2225
) -> Sequence:
26+
if name is None:
27+
raise ValueError("Model name was not given")
2328
if device.upper() not in utils.allowed_devices:
2429
raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}")
2530
if backend.upper() not in utils.allowed_backends:
2631
raise ValueError(f"Backend not allowed. Use any from {utils.allowed_backends}")
27-
args = ["AI.MODELSET", name, backend, device]
32+
args = ["AI.MODELSTORE", name, backend, device]
2833

34+
if tag is not None:
35+
args += ["TAG", tag]
2936
if batch is not None:
3037
args += ["BATCHSIZE", batch]
3138
if minbatch is not None:
39+
if batch is None:
40+
raise ValueError("Minbatch is not allowed without batch")
3241
args += ["MINBATCHSIZE", minbatch]
42+
if minbatchtimeout is not None:
43+
if minbatch is None:
44+
raise ValueError("Minbatchtimeout is not allowed without minbatch")
45+
args += ["MINBATCHTIMEOUT", minbatchtimeout]
46+
47+
if backend.upper() == "TF":
48+
if not all((inputs, outputs)):
49+
raise ValueError(
50+
"Require keyword arguments inputs and outputs for TF models"
51+
)
52+
args += [
53+
"INPUTS",
54+
len(inputs) if isinstance(inputs, List) else 1,
55+
*utils.listify(inputs),
56+
]
57+
args += [
58+
"OUTPUTS",
59+
len(outputs) if isinstance(outputs, List) else 1,
60+
*utils.listify(outputs),
61+
]
62+
elif inputs is not None or outputs is not None:
63+
raise ValueError(
64+
"Inputs and outputs keywords should not be specified for this backend"
65+
)
66+
chunk_size = 500 * 1024 * 1024 # TODO: this should be configurable.
67+
data_chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)]
68+
# TODO: need a test case for this
69+
args += ["BLOB", *data_chunks]
70+
return args
71+
72+
73+
def modelset(
74+
name: AnyStr,
75+
backend: str,
76+
device: str,
77+
data: ByteString,
78+
batch: int,
79+
minbatch: int,
80+
tag: AnyStr,
81+
inputs: Union[AnyStr, List[AnyStr]],
82+
outputs: Union[AnyStr, List[AnyStr]],
83+
) -> Sequence:
84+
if device.upper() not in utils.allowed_devices:
85+
raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}")
86+
if backend.upper() not in utils.allowed_backends:
87+
raise ValueError(f"Backend not allowed. Use any from {utils.allowed_backends}")
88+
args = ["AI.MODELSET", name, backend, device]
89+
3390
if tag is not None:
3491
args += ["TAG", tag]
92+
if batch is not None:
93+
args += ["BATCHSIZE", batch]
94+
if minbatch is not None:
95+
if batch is None:
96+
raise ValueError("Minbatch is not allowed without batch")
97+
args += ["MINBATCHSIZE", minbatch]
3598

3699
if backend.upper() == "TF":
37100
if not (all((inputs, outputs))):
@@ -56,7 +119,34 @@ def modeldel(name: AnyStr) -> Sequence:
56119
return "AI.MODELDEL", name
57120

58121

59-
def modelrun(name: AnyStr, inputs: List[AnyStr], outputs: List[AnyStr]) -> Sequence:
122+
def modelexecute(
123+
name: AnyStr,
124+
inputs: Union[AnyStr, List[AnyStr]],
125+
outputs: Union[AnyStr, List[AnyStr]],
126+
timeout: int,
127+
) -> Sequence:
128+
if name is None or inputs is None or outputs is None:
129+
raise ValueError("Missing required arguments for model execute command")
130+
args = [
131+
"AI.MODELEXECUTE",
132+
name,
133+
"INPUTS",
134+
len(utils.listify(inputs)),
135+
*utils.listify(inputs),
136+
"OUTPUTS",
137+
len(utils.listify(outputs)),
138+
*utils.listify(outputs),
139+
]
140+
if timeout is not None:
141+
args += ["TIMEOUT", timeout]
142+
return args
143+
144+
145+
def modelrun(
146+
name: AnyStr,
147+
inputs: Union[AnyStr, List[AnyStr]],
148+
outputs: Union[AnyStr, List[AnyStr]],
149+
) -> Sequence:
60150
args = (
61151
"AI.MODELRUN",
62152
name,

redisai/dag.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
from functools import partial
2-
from typing import AnyStr, Union, Sequence, Any, List
2+
from typing import Any, AnyStr, List, Sequence, Union
33

44
import numpy as np
55

6-
from redisai.postprocessor import Processor
76
from redisai import command_builder as builder
8-
7+
from redisai.postprocessor import Processor
98

109
processor = Processor()
1110

redisai/pipeline.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
import warnings
22
from functools import partial
3-
from typing import AnyStr, Union, Sequence
3+
from typing import AnyStr, Sequence, Union
44

55
import numpy as np
6+
import redis
67

78
from redisai import command_builder as builder
8-
import redis
99
from redisai.postprocessor import Processor
1010

11-
1211
processor = Processor()
1312

1413

redisai/postprocessor.py

+2
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,10 @@ def infoget(res):
6363
decoder = staticmethod(decoder)
6464
decoding_functions = (
6565
"loadbackend",
66+
"modelstore",
6667
"modelset",
6768
"modeldel",
69+
"modelexecute",
6870
"modelrun",
6971
"tensorset",
7072
"scriptset",

redisai/utils.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
from typing import Union, ByteString, Sequence, List, AnyStr, Callable
2-
import numpy as np
1+
from typing import AnyStr, ByteString, Callable, List, Sequence, Union
32

3+
import numpy as np
44

55
dtype_dict = {
66
"float": "FLOAT",

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
author="RedisLabs",
1616
author_email="[email protected]",
1717
packages=find_packages(),
18-
install_requires=["redis", "hiredis", "numpy"],
18+
install_requires=["redis", "hiredis", "numpy", "deprecated"],
1919
python_requires=">=3.6",
2020
classifiers=[
2121
"Development Status :: 4 - Beta",

test-requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ nose
66
codecov
77
numpy
88
ml2rt
9+
deprecated

0 commit comments

Comments
 (0)