diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 0ca4b5d..cdb366b 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.0.1 +current_version = 1.0.2 commit = True tag = False parse = (?P\d+)\.(?P\d+)\.(?P\d+)(\.(?P[a-z]+)(?P\d+))? @@ -7,14 +7,13 @@ serialize = {major}.{minor}.{patch} [bumpversion:file:setup.py] -search = version='{current_version}' -replace = version='{new_version}' +search = version="{current_version}" +replace = version="{new_version}" [bumpversion:file:redisai/__init__.py] -search = __version__ = '{current_version}' -replace = __version__ = '{new_version}' +search = __version__ = "{current_version}" +replace = __version__ = "{new_version}" [bumpversion:file:docs/conf.py] -search = release = '{current_version}' -replace = release = '{new_version}' - +search = release = "{current_version}" +replace = release = "{new_version}" diff --git a/.gitignore b/.gitignore index eefb1d8..343d72a 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ redisai.egg-info build/ dist/ docs/_build/ +.DS_Store diff --git a/docs/conf.py b/docs/conf.py index 3c7ad48..9897f38 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,26 +1,28 @@ -project = 'redisai-py' -copyright = '2020, RedisLabs' -author = 'RedisLabs' -release = '1.0.1' -extensions = ['sphinx.ext.autodoc', - 'sphinx.ext.autosummary', - 'sphinx.ext.extlinks', - 'sphinx.ext.napoleon', - 'sphinx.ext.todo', - 'sphinx.ext.intersphinx', - "sphinx_rtd_theme"] -templates_path = ['_templates'] -source_suffix = '.rst' -master_doc = 'index' -exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] -html_theme = 'sphinx_rtd_theme' +project = "redisai-py" +copyright = "2020, RedisLabs" +author = "RedisLabs" +release = "1.0.2" +extensions = [ + "sphinx.ext.autodoc", + "sphinx.ext.autosummary", + "sphinx.ext.extlinks", + "sphinx.ext.napoleon", + "sphinx.ext.todo", + "sphinx.ext.intersphinx", + "sphinx_rtd_theme", +] +templates_path = ["_templates"] +source_suffix = ".rst" +master_doc = "index" +exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] +html_theme = "sphinx_rtd_theme" html_use_smartypants = True -html_last_updated_fmt = '%b %d, %Y' +html_last_updated_fmt = "%b %d, %Y" html_split_index = False html_sidebars = { - '**': ['searchbox.html', 'globaltoc.html', 'sourcelink.html'], + "**": ["searchbox.html", "globaltoc.html", "sourcelink.html"], } -html_short_title = '%s-%s' % (project, release) +html_short_title = "%s-%s" % (project, release) napoleon_use_ivar = True napoleon_use_rtype = True @@ -29,4 +31,4 @@ add_module_names = False doctest_test_doctest_blocks = None -autoclass_content = 'class' +autoclass_content = "class" diff --git a/redisai/__init__.py b/redisai/__init__.py index 9b456cf..9c90c68 100644 --- a/redisai/__init__.py +++ b/redisai/__init__.py @@ -1,3 +1,3 @@ from .client import Client -__version__ = '1.0.1' +__version__ = "1.0.2" diff --git a/redisai/client.py b/redisai/client.py index d3a3e0e..21b09fb 100644 --- a/redisai/client.py +++ b/redisai/client.py @@ -3,11 +3,12 @@ import warnings from redis import StrictRedis -from redis.client import Pipeline as RedisPipeline import numpy as np -from . import command_builder as builder -from .postprocessor import Processor +from redisai import command_builder as builder +from redisai.dag import Dag +from redisai.pipeline import Pipeline +from redisai.postprocessor import Processor processor = Processor() @@ -37,13 +38,16 @@ class Client(StrictRedis): >>> from redisai import Client >>> con = Client(host='localhost', port=6379) """ + + REDISAI_COMMANDS_RESPONSE_CALLBACKS = {} + def __init__(self, debug=False, enable_postprocess=True, *args, **kwargs): super().__init__(*args, **kwargs) if debug: self.execute_command = enable_debug(super().execute_command) self.enable_postprocess = enable_postprocess - def pipeline(self, transaction: bool = True, shard_hint: bool = None) -> 'Pipeline': + def pipeline(self, transaction: bool = True, shard_hint: bool = None) -> "Pipeline": """ It follows the same pipeline implementation of native redis client but enables it to access redisai operation as well. This function is experimental in the @@ -57,13 +61,17 @@ def pipeline(self, transaction: bool = True, shard_hint: bool = None) -> 'Pipeli >>> pipe.execute() [True, b'OK'] """ - return Pipeline(self.enable_postprocess, - self.connection_pool, - self.response_callbacks, - transaction=True, shard_hint=None) + return Pipeline( + self.enable_postprocess, + self.connection_pool, + self.response_callbacks, + transaction=True, + shard_hint=None, + ) - def dag(self, load: Sequence = None, persist: Sequence = None, - readonly: bool = False) -> 'Dag': + def dag( + self, load: Sequence = None, persist: Sequence = None, readonly: bool = False + ) -> "Dag": """ It returns a DAG object on which other DAG-allowed operations can be called. For more details about DAG in RedisAI, refer to the RedisAI documentation. @@ -97,7 +105,9 @@ def dag(self, load: Sequence = None, persist: Sequence = None, >>> # You can even chain the operations >>> result = dag.tensorset(**akwargs).modelrun(**bkwargs).tensorget(**ckwargs).run() """ - return Dag(load, persist, self.execute_command, readonly, self.enable_postprocess) + return Dag( + load, persist, self.execute_command, readonly, self.enable_postprocess + ) def loadbackend(self, identifier: AnyStr, path: AnyStr) -> str: """ @@ -126,16 +136,18 @@ def loadbackend(self, identifier: AnyStr, path: AnyStr) -> str: res = self.execute_command(*args) return res if not self.enable_postprocess else processor.loadbackend(res) - def modelset(self, - key: AnyStr, - backend: str, - device: str, - data: ByteString, - batch: int = None, - minbatch: int = None, - tag: AnyStr = None, - inputs: Union[AnyStr, List[AnyStr]] = None, - outputs: Union[AnyStr, List[AnyStr]] = None) -> str: + def modelset( + self, + key: AnyStr, + backend: str, + device: str, + data: ByteString, + batch: int = None, + minbatch: int = None, + tag: AnyStr = None, + inputs: Union[AnyStr, List[AnyStr]] = None, + outputs: Union[AnyStr, List[AnyStr]] = None, + ) -> str: """ Set the model on provided key. @@ -180,8 +192,9 @@ def modelset(self, ... inputs=['a', 'b'], outputs=['mul'], tag='v1.0') 'OK' """ - args = builder.modelset(key, backend, device, data, - batch, minbatch, tag, inputs, outputs) + args = builder.modelset( + key, backend, device, data, batch, minbatch, tag, inputs, outputs + ) res = self.execute_command(*args) return res if not self.enable_postprocess else processor.modelset(res) @@ -234,10 +247,12 @@ def modeldel(self, key: AnyStr) -> str: res = self.execute_command(*args) return res if not self.enable_postprocess else processor.modeldel(res) - def modelrun(self, - key: AnyStr, - inputs: Union[AnyStr, List[AnyStr]], - outputs: Union[AnyStr, List[AnyStr]]) -> str: + def modelrun( + self, + key: AnyStr, + inputs: Union[AnyStr, List[AnyStr]], + outputs: Union[AnyStr, List[AnyStr]], + ) -> str: """ Run the model using input(s) which are already in the scope and are associated to some keys. Modelrun also needs the output key name(s) to store the output @@ -292,17 +307,22 @@ def modelscan(self) -> List[List[AnyStr]]: >>> con.modelscan() [['pt_model', ''], ['m', 'v1.2']] """ - warnings.warn("Experimental: Model List API is experimental and might change " - "in the future without any notice", UserWarning) + warnings.warn( + "Experimental: Model List API is experimental and might change " + "in the future without any notice", + UserWarning, + ) args = builder.modelscan() res = self.execute_command(*args) return res if not self.enable_postprocess else processor.modelscan(res) - def tensorset(self, - key: AnyStr, - tensor: Union[np.ndarray, list, tuple], - shape: Sequence[int] = None, - dtype: str = None) -> str: + def tensorset( + self, + key: AnyStr, + tensor: Union[np.ndarray, list, tuple], + shape: Sequence[int] = None, + dtype: str = None, + ) -> str: """ Set the tensor to a key in RedisAI @@ -334,9 +354,13 @@ def tensorset(self, res = self.execute_command(*args) return res if not self.enable_postprocess else processor.tensorset(res) - def tensorget(self, - key: AnyStr, as_numpy: bool = True, as_numpy_mutable: bool = False, - meta_only: bool = False) -> Union[dict, np.ndarray]: + def tensorget( + self, + key: AnyStr, + as_numpy: bool = True, + as_numpy_mutable: bool = False, + meta_only: bool = False, + ) -> Union[dict, np.ndarray]: """ Retrieve the value of a tensor from the server. By default it returns the numpy array but it can be controlled using the `as_type` and `meta_only` argument. @@ -371,9 +395,15 @@ def tensorget(self, """ args = builder.tensorget(key, as_numpy, meta_only) res = self.execute_command(*args) - return res if not self.enable_postprocess else processor.tensorget(res, as_numpy, as_numpy_mutable, meta_only) + return ( + res + if not self.enable_postprocess + else processor.tensorget(res, as_numpy, as_numpy_mutable, meta_only) + ) - def scriptset(self, key: AnyStr, device: str, script: str, tag: AnyStr = None) -> str: + def scriptset( + self, key: AnyStr, device: str, script: str, tag: AnyStr = None + ) -> str: """ Set the script to RedisAI. Action similar to Modelset. RedisAI uses the TorchScript engine to execute the script. So the script should have only TorchScript supported @@ -465,12 +495,13 @@ def scriptdel(self, key: AnyStr) -> str: res = self.execute_command(*args) return res if not self.enable_postprocess else processor.scriptdel(res) - def scriptrun(self, - key: AnyStr, - function: AnyStr, - inputs: Union[AnyStr, Sequence[AnyStr]], - outputs: Union[AnyStr, Sequence[AnyStr]] - ) -> str: + def scriptrun( + self, + key: AnyStr, + function: AnyStr, + inputs: Union[AnyStr, Sequence[AnyStr]], + outputs: Union[AnyStr, Sequence[AnyStr]], + ) -> str: """ Run an already set script. Similar to modelrun @@ -516,8 +547,11 @@ def scriptscan(self) -> List[List[AnyStr]]: >>> con.scriptscan() [['ket1', 'v1.0'], ['ket2', '']] """ - warnings.warn("Experimental: Script List API is experimental and might change " - "in the future without any notice", UserWarning) + warnings.warn( + "Experimental: Script List API is experimental and might change " + "in the future without any notice", + UserWarning, + ) args = builder.scriptscan() res = self.execute_command(*args) return res if not self.enable_postprocess else processor.scriptscan(res) @@ -575,118 +609,10 @@ def inforeset(self, key: AnyStr) -> str: return res if not self.enable_postprocess else processor.inforeset(res) -class Pipeline(RedisPipeline, Client): - def __init__(self, enable_postprocess, *args, **kwargs): - warnings.warn("Pipeling AI commands through this client is experimental.", - UserWarning) - self.enable_postprocess = False - if enable_postprocess: - warnings.warn("Postprocessing is enabled but not allowed in pipelines." - "Disable postprocessing to remove this warning.", UserWarning) - self.tensorget_processors = [] - super().__init__(*args, **kwargs) - - def dag(self, *args, **kwargs): - raise RuntimeError("Pipeline object doesn't allow DAG creation currently") - - def tensorget(self, key, as_numpy=True, as_numpy_mutable=False, meta_only=False): - self.tensorget_processors.append(partial(processor.tensorget, - as_numpy=as_numpy, - as_numpy_mutable=as_numpy_mutable, - meta_only=meta_only)) - return super().tensorget(key, as_numpy, as_numpy_mutable, meta_only) - - def _execute_transaction(self, *args, **kwargs): - # TODO: Blocking commands like MODELRUN, SCRIPTRUN and DAGRUN won't work - res = super()._execute_transaction(*args, **kwargs) - for i in range(len(res)): - # tensorget will have minimum 4 values if meta_only = True - if isinstance(res[i], list) and len(res[i]) >= 4: - res[i] = self.tensorget_processors.pop(0)(res[i]) - return res - - def _execute_pipeline(self, *args, **kwargs): - res = super()._execute_pipeline(*args, **kwargs) - for i in range(len(res)): - # tensorget will have minimum 4 values if meta_only = True - if isinstance(res[i], list) and len(res[i]) >= 4: - res[i] = self.tensorget_processors.pop(0)(res[i]) - return res - - -class Dag: - def __init__(self, load, persist, executor, readonly=False, postprocess=True): - self.result_processors = [] - self.enable_postprocess = True - if readonly: - if persist: - raise RuntimeError("READONLY requests cannot write (duh!) and should not " - "have PERSISTing values") - self.commands = ['AI.DAGRUN_RO'] - else: - self.commands = ['AI.DAGRUN'] - if load: - if not isinstance(load, (list, tuple)): - self.commands += ["LOAD", 1, load] - else: - self.commands += ["LOAD", len(load), *load] - if persist: - if not isinstance(persist, (list, tuple)): - self.commands += ["PERSIST", 1, persist, '|>'] - else: - self.commands += ["PERSIST", len(persist), *persist, '|>'] - else: - self.commands.append('|>') - self.executor = executor - - def tensorset(self, - key: AnyStr, - tensor: Union[np.ndarray, list, tuple], - shape: Sequence[int] = None, - dtype: str = None) -> Any: - args = builder.tensorset(key, tensor, shape, dtype) - self.commands.extend(args) - self.commands.append("|>") - self.result_processors.append(bytes.decode) - return self - - def tensorget(self, - key: AnyStr, as_numpy: bool = True, as_numpy_mutable: bool = False, - meta_only: bool = False) -> Any: - args = builder.tensorget(key, as_numpy, as_numpy_mutable) - self.commands.extend(args) - self.commands.append("|>") - self.result_processors.append(partial(processor.tensorget, - as_numpy=as_numpy, - as_numpy_mutable=as_numpy_mutable, - meta_only=meta_only)) - return self - - def modelrun(self, - key: AnyStr, - inputs: Union[AnyStr, List[AnyStr]], - outputs: Union[AnyStr, List[AnyStr]]) -> Any: - args = builder.modelrun(key, inputs, outputs) - self.commands.extend(args) - self.commands.append("|>") - self.result_processors.append(bytes.decode) - return self - - def run(self): - commands = self.commands[:-1] # removing the last "|> - results = self.executor(*commands) - if self.enable_postprocess: - out = [] - for res, fn in zip(results, self.result_processors): - out.append(fn(res)) - else: - out = results - return out - - def enable_debug(f): @wraps(f) def wrapper(*args): print(*args) return f(*args) + return wrapper diff --git a/redisai/command_builder.py b/redisai/command_builder.py index eae464e..3087342 100644 --- a/redisai/command_builder.py +++ b/redisai/command_builder.py @@ -6,53 +6,65 @@ def loadbackend(identifier: AnyStr, path: AnyStr) -> Sequence: - return 'AI.CONFIG LOADBACKEND', identifier, path - - -def modelset(name: AnyStr, backend: str, device: str, data: ByteString, - batch: int, minbatch: int, tag: AnyStr, - inputs: Union[AnyStr, List[AnyStr]], - outputs: Union[AnyStr, List[AnyStr]]) -> Sequence: + return "AI.CONFIG LOADBACKEND", identifier, path + + +def modelset( + name: AnyStr, + backend: str, + device: str, + data: ByteString, + batch: int, + minbatch: int, + tag: AnyStr, + inputs: Union[AnyStr, List[AnyStr]], + outputs: Union[AnyStr, List[AnyStr]], +) -> Sequence: if device.upper() not in utils.allowed_devices: raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}") if backend.upper() not in utils.allowed_backends: raise ValueError(f"Backend not allowed. Use any from {utils.allowed_backends}") - args = ['AI.MODELSET', name, backend, device] + args = ["AI.MODELSET", name, backend, device] if batch is not None: - args += ['BATCHSIZE', batch] + args += ["BATCHSIZE", batch] if minbatch is not None: - args += ['MINBATCHSIZE', minbatch] + args += ["MINBATCHSIZE", minbatch] if tag is not None: - args += ['TAG', tag] - - if backend.upper() == 'TF': - if not(all((inputs, outputs))): - raise ValueError( - 'Require keyword arguments input and output for TF models') - args += ['INPUTS', *utils.listify(inputs)] - args += ['OUTPUTS', *utils.listify(outputs)] + args += ["TAG", tag] + + if backend.upper() == "TF": + if not (all((inputs, outputs))): + raise ValueError("Require keyword arguments input and output for TF models") + args += ["INPUTS", *utils.listify(inputs)] + args += ["OUTPUTS", *utils.listify(outputs)] chunk_size = 500 * 1024 * 1024 - data_chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] + data_chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)] # TODO: need a test case for this - args += ['BLOB', *data_chunks] + args += ["BLOB", *data_chunks] return args def modelget(name: AnyStr, meta_only=False) -> Sequence: - args = ['AI.MODELGET', name, 'META'] + args = ["AI.MODELGET", name, "META"] if not meta_only: - args.append('BLOB') + args.append("BLOB") return args def modeldel(name: AnyStr) -> Sequence: - return 'AI.MODELDEL', name + return "AI.MODELDEL", name def modelrun(name: AnyStr, inputs: List[AnyStr], outputs: List[AnyStr]) -> Sequence: - args = ('AI.MODELRUN', name, 'INPUTS', *utils.listify(inputs), 'OUTPUTS', - *utils.listify(outputs)) + args = ( + "AI.MODELRUN", + name, + "INPUTS", + *utils.listify(inputs), + "OUTPUTS", + *utils.listify(outputs), + ) return args @@ -60,71 +72,85 @@ def modelscan() -> Sequence: return ("AI._MODELSCAN",) -def tensorset(key: AnyStr, - tensor: Union[np.ndarray, list, tuple], - shape: Sequence[int] = None, - dtype: str = None) -> Sequence: +def tensorset( + key: AnyStr, + tensor: Union[np.ndarray, list, tuple], + shape: Sequence[int] = None, + dtype: str = None, +) -> Sequence: if np and isinstance(tensor, np.ndarray): dtype, shape, blob = utils.numpy2blob(tensor) - args = ['AI.TENSORSET', key, dtype, *shape, 'BLOB', blob] + args = ["AI.TENSORSET", key, dtype, *shape, "BLOB", blob] elif isinstance(tensor, (list, tuple)): try: dtype = utils.dtype_dict[dtype.lower()] except KeyError: - raise TypeError(f'``{dtype}`` is not supported by RedisAI. Currently ' - f'supported types are {list(utils.dtype_dict.keys())}') + raise TypeError( + f"``{dtype}`` is not supported by RedisAI. Currently " + f"supported types are {list(utils.dtype_dict.keys())}" + ) except AttributeError: - raise TypeError("tensorset() missing argument 'dtype' or " - "value of 'dtype' is None") + raise TypeError( + "tensorset() missing argument 'dtype' or value of 'dtype' is None" + ) if shape is None: shape = (len(tensor),) - args = ['AI.TENSORSET', key, dtype, *shape, 'VALUES', *tensor] + args = ["AI.TENSORSET", key, dtype, *shape, "VALUES", *tensor] else: - raise TypeError(f"``tensor`` argument must be a numpy array or a list or a " - f"tuple, but got {type(tensor)}") + raise TypeError( + f"``tensor`` argument must be a numpy array or a list or a " + f"tuple, but got {type(tensor)}" + ) return args -def tensorget(key: AnyStr, as_numpy: bool = True, - meta_only: bool = False) -> Sequence: - args = ['AI.TENSORGET', key, 'META'] +def tensorget(key: AnyStr, as_numpy: bool = True, meta_only: bool = False) -> Sequence: + args = ["AI.TENSORGET", key, "META"] if not meta_only: if as_numpy is True: - args.append('BLOB') + args.append("BLOB") else: - args.append('VALUES') + args.append("VALUES") return args def scriptset(name: AnyStr, device: str, script: str, tag: AnyStr = None) -> Sequence: if device.upper() not in utils.allowed_devices: raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}") - args = ['AI.SCRIPTSET', name, device] + args = ["AI.SCRIPTSET", name, device] if tag: - args += ['TAG', tag] + args += ["TAG", tag] args.append("SOURCE") args.append(script) return args def scriptget(name: AnyStr, meta_only=False) -> Sequence: - args = ['AI.SCRIPTGET', name, 'META'] + args = ["AI.SCRIPTGET", name, "META"] if not meta_only: - args.append('SOURCE') + args.append("SOURCE") return args def scriptdel(name: AnyStr) -> Sequence: - return 'AI.SCRIPTDEL', name - - -def scriptrun(name: AnyStr, - function: AnyStr, - inputs: Union[AnyStr, Sequence[AnyStr]], - outputs: Union[AnyStr, Sequence[AnyStr]] - ) -> Sequence: - args = ('AI.SCRIPTRUN', name, function, 'INPUTS', *utils.listify(inputs), 'OUTPUTS', - *utils.listify(outputs)) + return "AI.SCRIPTDEL", name + + +def scriptrun( + name: AnyStr, + function: AnyStr, + inputs: Union[AnyStr, Sequence[AnyStr]], + outputs: Union[AnyStr, Sequence[AnyStr]], +) -> Sequence: + args = ( + "AI.SCRIPTRUN", + name, + function, + "INPUTS", + *utils.listify(inputs), + "OUTPUTS", + *utils.listify(outputs), + ) return args @@ -133,8 +159,8 @@ def scriptscan() -> Sequence: def infoget(key: AnyStr) -> Sequence: - return 'AI.INFO', key + return "AI.INFO", key def inforeset(key: AnyStr) -> Sequence: - return 'AI.INFO', key, 'RESETSTAT' + return "AI.INFO", key, "RESETSTAT" diff --git a/redisai/dag.py b/redisai/dag.py new file mode 100644 index 0000000..1b60529 --- /dev/null +++ b/redisai/dag.py @@ -0,0 +1,94 @@ +from functools import partial +from typing import AnyStr, Union, Sequence, Any, List + +import numpy as np + +from redisai.postprocessor import Processor +from redisai import command_builder as builder + + +processor = Processor() + + +class Dag: + def __init__(self, load, persist, executor, readonly=False, postprocess=True): + self.result_processors = [] + self.enable_postprocess = True + if readonly: + if persist: + raise RuntimeError( + "READONLY requests cannot write (duh!) and should not " + "have PERSISTing values" + ) + self.commands = ["AI.DAGRUN_RO"] + else: + self.commands = ["AI.DAGRUN"] + if load: + if not isinstance(load, (list, tuple)): + self.commands += ["LOAD", 1, load] + else: + self.commands += ["LOAD", len(load), *load] + if persist: + if not isinstance(persist, (list, tuple)): + self.commands += ["PERSIST", 1, persist, "|>"] + else: + self.commands += ["PERSIST", len(persist), *persist, "|>"] + else: + self.commands.append("|>") + self.executor = executor + + def tensorset( + self, + key: AnyStr, + tensor: Union[np.ndarray, list, tuple], + shape: Sequence[int] = None, + dtype: str = None, + ) -> Any: + args = builder.tensorset(key, tensor, shape, dtype) + self.commands.extend(args) + self.commands.append("|>") + self.result_processors.append(bytes.decode) + return self + + def tensorget( + self, + key: AnyStr, + as_numpy: bool = True, + as_numpy_mutable: bool = False, + meta_only: bool = False, + ) -> Any: + args = builder.tensorget(key, as_numpy, as_numpy_mutable) + self.commands.extend(args) + self.commands.append("|>") + self.result_processors.append( + partial( + processor.tensorget, + as_numpy=as_numpy, + as_numpy_mutable=as_numpy_mutable, + meta_only=meta_only, + ) + ) + return self + + def modelrun( + self, + key: AnyStr, + inputs: Union[AnyStr, List[AnyStr]], + outputs: Union[AnyStr, List[AnyStr]], + ) -> Any: + args = builder.modelrun(key, inputs, outputs) + self.commands.extend(args) + self.commands.append("|>") + self.result_processors.append(bytes.decode) + return self + + def run(self): + commands = self.commands[:-1] # removing the last "|> + results = self.executor(*commands) + if self.enable_postprocess: + out = [] + for res, fn in zip(results, self.result_processors): + out.append(fn(res)) + else: + out = results + return out diff --git a/redisai/pipeline.py b/redisai/pipeline.py new file mode 100644 index 0000000..447f528 --- /dev/null +++ b/redisai/pipeline.py @@ -0,0 +1,58 @@ +import warnings +from functools import partial +from typing import AnyStr, Union, Sequence + +import numpy as np + +from redisai import command_builder as builder +import redis +from redisai.postprocessor import Processor + + +processor = Processor() + + +class Pipeline(redis.client.Pipeline): + def __init__(self, enable_postprocess, *args, **kwargs): + self.enable_postprocess = enable_postprocess + self.tensorget_processors = [] + self.tensorset_processors = [] + super().__init__(*args, **kwargs) + + def tensorget(self, key, as_numpy=True, as_numpy_mutable=False, meta_only=False): + self.tensorget_processors.append( + partial( + processor.tensorget, + as_numpy=as_numpy, + as_numpy_mutable=as_numpy_mutable, + meta_only=meta_only, + ) + ) + args = builder.tensorget(key, as_numpy, meta_only) + return self.execute_command(*args) + + def tensorset( + self, + key: AnyStr, + tensor: Union[np.ndarray, list, tuple], + shape: Sequence[int] = None, + dtype: str = None, + ) -> str: + args = builder.tensorset(key, tensor, shape, dtype) + return self.execute_command(*args) + + def _execute_transaction(self, *args, **kwargs): + res = super()._execute_transaction(*args, **kwargs) + for i in range(len(res)): + # tensorget will have minimum 4 values if meta_only = True + if isinstance(res[i], list) and len(res[i]) >= 4: + res[i] = self.tensorget_processors.pop(0)(res[i]) + return res + + def _execute_pipeline(self, *args, **kwargs): + res = super()._execute_pipeline(*args, **kwargs) + for i in range(len(res)): + # tensorget will have minimum 4 values if meta_only = True + if isinstance(res[i], list) and len(res[i]) >= 4: + res[i] = self.tensorget_processors.pop(0)(res[i]) + return res diff --git a/redisai/postprocessor.py b/redisai/postprocessor.py index e95f9b8..c37c1ef 100644 --- a/redisai/postprocessor.py +++ b/redisai/postprocessor.py @@ -6,12 +6,11 @@ def decoder(val): class Processor: - @staticmethod def modelget(res): resdict = utils.list2dict(res) - utils.recursive_bytetransform(resdict['inputs'], lambda x: x.decode()) - utils.recursive_bytetransform(resdict['outputs'], lambda x: x.decode()) + utils.recursive_bytetransform(resdict["inputs"], lambda x: x.decode()) + utils.recursive_bytetransform(resdict["outputs"], lambda x: x.decode()) return resdict @staticmethod @@ -29,12 +28,22 @@ def tensorget(res, as_numpy, as_numpy_mutable, meta_only): if meta_only is True: return rai_result elif as_numpy_mutable is True: - return utils.blob2numpy(rai_result['blob'], rai_result['shape'], rai_result['dtype'], mutable=True) + return utils.blob2numpy( + rai_result["blob"], + rai_result["shape"], + rai_result["dtype"], + mutable=True, + ) elif as_numpy is True: - return utils.blob2numpy(rai_result['blob'], rai_result['shape'], rai_result['dtype'], mutable=False) + return utils.blob2numpy( + rai_result["blob"], + rai_result["shape"], + rai_result["dtype"], + mutable=False, + ) else: - target = float if rai_result['dtype'] in ('FLOAT', 'DOUBLE') else int - utils.recursive_bytetransform(rai_result['values'], target) + target = float if rai_result["dtype"] in ("FLOAT", "DOUBLE") else int + utils.recursive_bytetransform(rai_result["values"], target) return rai_result @staticmethod @@ -52,7 +61,16 @@ def infoget(res): # These functions are only doing decoding on the output from redis decoder = staticmethod(decoder) -decoding_functions = ('loadbackend', 'modelset', 'modeldel', 'modelrun', 'tensorset', - 'scriptset', 'scriptdel', 'scriptrun', 'inforeset') +decoding_functions = ( + "loadbackend", + "modelset", + "modeldel", + "modelrun", + "tensorset", + "scriptset", + "scriptdel", + "scriptrun", + "inforeset", +) for fn in decoding_functions: setattr(Processor, fn, decoder) diff --git a/redisai/utils.py b/redisai/utils.py index 8becd72..3723bc5 100644 --- a/redisai/utils.py +++ b/redisai/utils.py @@ -3,21 +3,22 @@ dtype_dict = { - 'float': 'FLOAT', - 'double': 'DOUBLE', - 'float32': 'FLOAT', - 'float64': 'DOUBLE', - 'int8': 'INT8', - 'int16': 'INT16', - 'int32': 'INT32', - 'int64': 'INT64', - 'uint8': 'UINT8', - 'uint16': 'UINT16', - 'uint32': 'UINT32', - 'uint64': 'UINT64'} + "float": "FLOAT", + "double": "DOUBLE", + "float32": "FLOAT", + "float64": "DOUBLE", + "int8": "INT8", + "int16": "INT16", + "int32": "INT32", + "int64": "INT64", + "uint8": "UINT8", + "uint16": "UINT16", + "uint32": "UINT32", + "uint64": "UINT64", +} -allowed_devices = {'CPU', 'GPU'} -allowed_backends = {'TF', 'TFLITE', 'TORCH', 'ONNX'} +allowed_devices = {"CPU", "GPU"} +allowed_backends = {"TF", "TFLITE", "TORCH", "ONNX"} def numpy2blob(tensor: np.ndarray) -> tuple: @@ -31,12 +32,11 @@ def numpy2blob(tensor: np.ndarray) -> tuple: return dtype, shape, blob -def blob2numpy(value: ByteString, shape: Union[list, tuple], dtype: str, mutable: bool) -> np.ndarray: +def blob2numpy( + value: ByteString, shape: Union[list, tuple], dtype: str, mutable: bool +) -> np.ndarray: """Convert `BLOB` result from RedisAI to `np.ndarray`.""" - mm = { - 'FLOAT': 'float32', - 'DOUBLE': 'float64' - } + mm = {"FLOAT": "float32", "DOUBLE": "float64"} dtype = mm.get(dtype, dtype.lower()) if mutable: a = np.fromstring(value, dtype=dtype) @@ -53,7 +53,7 @@ def list2dict(lst): for i in range(0, len(lst), 2): key = lst[i].decode().lower() val = lst[i + 1] - if key != 'blob' and isinstance(val, bytes): + if key != "blob" and isinstance(val, bytes): val = val.decode() out[key] = val return out diff --git a/setup.py b/setup.py index 894c283..8cba692 100644 --- a/setup.py +++ b/setup.py @@ -2,30 +2,30 @@ from setuptools import setup, find_packages -with open('README.rst') as f: +with open("README.rst") as f: long_description = f.read() setup( - name='redisai', - version='1.0.1', - description='RedisAI Python Client', + name="redisai", + version="1.0.2", + description="RedisAI Python Client", long_description=long_description, - long_description_content_type='text/x-rst', - url='http://github.com/RedisAI/redisai-py', - author='RedisLabs', - author_email='oss@redislabs.com', + long_description_content_type="text/x-rst", + url="http://github.com/RedisAI/redisai-py", + author="RedisLabs", + author_email="oss@redislabs.com", packages=find_packages(), - install_requires=['redis', 'hiredis', 'numpy'], - python_requires='>=3.6', + install_requires=["redis", "hiredis", "numpy"], + python_requires=">=3.6", classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: BSD License', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3 :: Only', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - 'Topic :: Database' - ] + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: BSD License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Topic :: Database", + ], ) diff --git a/test/test.py b/test/test.py index d76601a..1dcf8fa 100644 --- a/test/test.py +++ b/test/test.py @@ -9,6 +9,8 @@ DEBUG = False +tf_graph = "graph.pb" +torch_graph = "pt-minimal.pt" class Capturing(list): @@ -19,16 +21,17 @@ def __enter__(self): def __exit__(self, *args): self.extend(self._stringio.getvalue().splitlines()) - del self._stringio # free up some memory + del self._stringio # free up some memory sys.stdout = self._stdout -MODEL_DIR = os.path.dirname(os.path.abspath(__file__)) + '/testdata' +MODEL_DIR = os.path.dirname(os.path.abspath(__file__)) + "/testdata" script = r""" def bar(a, b): return a + b """ + class RedisAITestBase(TestCase): def setUp(self): super().setUp() @@ -39,207 +42,254 @@ def get_client(self, debug=DEBUG): class ClientTestCase(RedisAITestBase): - def test_set_non_numpy_tensor(self): con = self.get_client() - con.tensorset('x', (2, 3, 4, 5), dtype='float') - result = con.tensorget('x', as_numpy=False) - self.assertEqual([2, 3, 4, 5], result['values']) - self.assertEqual([4], result['shape']) - - con.tensorset('x', (2, 3, 4, 5), dtype='float64') - result = con.tensorget('x', as_numpy=False) - self.assertEqual([2, 3, 4, 5], result['values']) - self.assertEqual([4], result['shape']) - self.assertEqual('DOUBLE', result['dtype']) - - con.tensorset('x', (2, 3, 4, 5), dtype='int16', shape=(2, 2)) - result = con.tensorget('x', as_numpy=False) - self.assertEqual([2, 3, 4, 5], result['values']) - self.assertEqual([2, 2], result['shape']) + con.tensorset("x", (2, 3, 4, 5), dtype="float") + result = con.tensorget("x", as_numpy=False) + self.assertEqual([2, 3, 4, 5], result["values"]) + self.assertEqual([4], result["shape"]) + + con.tensorset("x", (2, 3, 4, 5), dtype="float64") + result = con.tensorget("x", as_numpy=False) + self.assertEqual([2, 3, 4, 5], result["values"]) + self.assertEqual([4], result["shape"]) + self.assertEqual("DOUBLE", result["dtype"]) + + con.tensorset("x", (2, 3, 4, 5), dtype="int16", shape=(2, 2)) + result = con.tensorget("x", as_numpy=False) + self.assertEqual([2, 3, 4, 5], result["values"]) + self.assertEqual([2, 2], result["shape"]) with self.assertRaises(TypeError): - con.tensorset('x', (2, 3, 4, 5), dtype='wrongtype', shape=(2, 2)) - con.tensorset('x', (2, 3, 4, 5), dtype='int8', shape=(2, 2)) - result = con.tensorget('x', as_numpy=False) - self.assertEqual('INT8', result['dtype']) - self.assertEqual([2, 3, 4, 5], result['values']) - self.assertEqual([2, 2], result['shape']) - self.assertIn('values', result) + con.tensorset("x", (2, 3, 4, 5), dtype="wrongtype", shape=(2, 2)) + con.tensorset("x", (2, 3, 4, 5), dtype="int8", shape=(2, 2)) + result = con.tensorget("x", as_numpy=False) + self.assertEqual("INT8", result["dtype"]) + self.assertEqual([2, 3, 4, 5], result["values"]) + self.assertEqual([2, 2], result["shape"]) + self.assertIn("values", result) with self.assertRaises(TypeError): - con.tensorset('x') + con.tensorset("x") con.tensorset(1) def test_tensorget_meta(self): con = self.get_client() - con.tensorset('x', (2, 3, 4, 5), dtype='float') - result = con.tensorget('x', meta_only=True) - self.assertNotIn('values', result) - self.assertEqual([4], result['shape']) + con.tensorset("x", (2, 3, 4, 5), dtype="float") + result = con.tensorget("x", meta_only=True) + self.assertNotIn("values", result) + self.assertEqual([4], result["shape"]) def test_numpy_tensor(self): con = self.get_client() input_array = np.array([2, 3], dtype=np.float32) - con.tensorset('x', input_array) - values = con.tensorget('x') + con.tensorset("x", input_array) + values = con.tensorget("x") self.assertEqual(values.dtype, np.float32) input_array = np.array([2, 3], dtype=np.float64) - con.tensorset('x', input_array) - values = con.tensorget('x') + con.tensorset("x", input_array) + values = con.tensorget("x") self.assertEqual(values.dtype, np.float64) input_array = np.array([2, 3]) - con.tensorset('x', input_array) - values = con.tensorget('x') + con.tensorset("x", input_array) + values = con.tensorget("x") self.assertTrue(np.allclose([2, 3], values)) self.assertEqual(values.dtype, np.int64) self.assertEqual(values.shape, (2,)) self.assertTrue((np.allclose(values, input_array))) - ret = con.tensorset('x', values) - self.assertEqual(ret, 'OK') + ret = con.tensorset("x", values) + self.assertEqual(ret, "OK") # By default tensorget returns immutable, unless as_numpy_mutable is set as True - ret = con.tensorget('x') + ret = con.tensorget("x") self.assertRaises(ValueError, np.put, ret, 0, 1) - ret = con.tensorget('x', as_numpy_mutable=True) + ret = con.tensorget("x", as_numpy_mutable=True) np.put(ret, 0, 1) self.assertEqual(ret[0], 1) - stringarr = np.array('dummy') + stringarr = np.array("dummy") with self.assertRaises(TypeError): - con.tensorset('trying', stringarr) + con.tensorset("trying", stringarr) def test_modelset_errors(self): - model_path = os.path.join(MODEL_DIR, 'graph.pb') + model_path = os.path.join(MODEL_DIR, tf_graph) model_pb = load_model(model_path) con = self.get_client() with self.assertRaises(ValueError): - con.modelset('m', 'tf', 'wrongdevice', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + con.modelset( + "m", + "tf", + "wrongdevice", + model_pb, + inputs=["a", "b"], + outputs=["mul"], + tag="v1.0", + ) with self.assertRaises(ValueError): - con.modelset('m', 'wrongbackend', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + con.modelset( + "m", + "wrongbackend", + "cpu", + model_pb, + inputs=["a", "b"], + outputs=["mul"], + tag="v1.0", + ) def test_modelget_meta(self): - model_path = os.path.join(MODEL_DIR, 'graph.pb') + model_path = os.path.join(MODEL_DIR, tf_graph) model_pb = load_model(model_path) con = self.get_client() - con.modelset('m', 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.0') - model = con.modelget('m', meta_only=True) - self.assertEqual(model, {'backend': 'TF', 'batchsize': 0, 'device': 'cpu', 'inputs': ['a', 'b'], 'minbatchsize': 0, 'outputs': ['mul'], 'tag': 'v1.0'}) - + con.modelset( + "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"], tag="v1.0" + ) + model = con.modelget("m", meta_only=True) + self.assertEqual( + model, + { + "backend": "TF", + "batchsize": 0, + "device": "cpu", + "inputs": ["a", "b"], + "minbatchsize": 0, + "outputs": ["mul"], + "tag": "v1.0", + }, + ) + def test_modelrun_non_list_input_output(self): - model_path = os.path.join(MODEL_DIR, 'graph.pb') + model_path = os.path.join(MODEL_DIR, tf_graph) model_pb = load_model(model_path) con = self.get_client() - con.modelset('m', 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.7') - con.tensorset('a', (2, 3), dtype='float') - con.tensorset('b', (2, 3), dtype='float') - ret = con.modelrun('m', ['a', 'b'], 'out') - self.assertEqual(ret, 'OK') + con.modelset( + "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"], tag="v1.7" + ) + con.tensorset("a", (2, 3), dtype="float") + con.tensorset("b", (2, 3), dtype="float") + ret = con.modelrun("m", ["a", "b"], "out") + self.assertEqual(ret, "OK") def test_nonasciichar(self): - nonascii = 'ĉ' - model_path = os.path.join(MODEL_DIR, 'graph.pb') + nonascii = "ĉ" + model_path = os.path.join(MODEL_DIR, tf_graph) model_pb = load_model(model_path) con = self.get_client() - con.modelset('m' + nonascii, 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.0') - con.tensorset('a' + nonascii, (2, 3), dtype='float') - con.tensorset('b', (2, 3), dtype='float') - con.modelrun('m' + nonascii, ['a' + nonascii, 'b'], ['c' + nonascii]) - tensor = con.tensorget('c' + nonascii) - self.assertTrue((np.allclose(tensor, [4., 9.]))) + con.modelset( + "m" + nonascii, + "tf", + "cpu", + model_pb, + inputs=["a", "b"], + outputs=["mul"], + tag="v1.0", + ) + con.tensorset("a" + nonascii, (2, 3), dtype="float") + con.tensorset("b", (2, 3), dtype="float") + con.modelrun("m" + nonascii, ["a" + nonascii, "b"], ["c" + nonascii]) + tensor = con.tensorget("c" + nonascii) + self.assertTrue((np.allclose(tensor, [4.0, 9.0]))) def test_run_tf_model(self): - model_path = os.path.join(MODEL_DIR, 'graph.pb') - bad_model_path = os.path.join(MODEL_DIR, 'pt-minimal.pt') + model_path = os.path.join(MODEL_DIR, tf_graph) + bad_model_path = os.path.join(MODEL_DIR, torch_graph) model_pb = load_model(model_path) wrong_model_pb = load_model(bad_model_path) con = self.get_client() - con.modelset('m', 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.0') - con.modeldel('m') - self.assertRaises(ResponseError, con.modelget, 'm') - con.modelset('m', 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs='mul', tag='v1.0') + con.modelset( + "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"], tag="v1.0" + ) + con.modeldel("m") + self.assertRaises(ResponseError, con.modelget, "m") + con.modelset( + "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs="mul", tag="v1.0" + ) # wrong model - self.assertRaises(ResponseError, - con.modelset, 'm', 'tf', 'cpu', - wrong_model_pb, - inputs=['a', 'b'], outputs=['mul']) + self.assertRaises( + ResponseError, + con.modelset, + "m", + "tf", + "cpu", + wrong_model_pb, + inputs=["a", "b"], + outputs=["mul"], + ) # missing inputs/outputs - self.assertRaises(ValueError, - con.modelset, 'm', 'tf', 'cpu', - wrong_model_pb) + self.assertRaises(ValueError, con.modelset, "m", "tf", "cpu", wrong_model_pb) # wrong backend - self.assertRaises(ResponseError, - con.modelset, 'm', 'torch', 'cpu', - model_pb, - inputs=['a', 'b'], outputs=['mul']) - - con.tensorset('a', (2, 3), dtype='float') - con.tensorset('b', (2, 3), dtype='float') - con.modelrun('m', ['a', 'b'], ['c']) - tensor = con.tensorget('c') + self.assertRaises( + ResponseError, + con.modelset, + "m", + "torch", + "cpu", + model_pb, + inputs=["a", "b"], + outputs=["mul"], + ) + + con.tensorset("a", (2, 3), dtype="float") + con.tensorset("b", (2, 3), dtype="float") + con.modelrun("m", ["a", "b"], ["c"]) + tensor = con.tensorget("c") self.assertTrue(np.allclose([4, 9], tensor)) - model_det = con.modelget('m') - self.assertTrue(model_det['backend'] == 'TF') - self.assertTrue(model_det['device'] == 'cpu') # TODO; RedisAI returns small letter - self.assertTrue(model_det['tag'] == 'v1.0') - con.modeldel('m') - self.assertRaises(ResponseError, con.modelget, 'm') + model_det = con.modelget("m") + self.assertTrue(model_det["backend"] == "TF") + self.assertTrue( + model_det["device"] == "cpu" + ) # TODO; RedisAI returns small letter + self.assertTrue(model_det["tag"] == "v1.0") + con.modeldel("m") + self.assertRaises(ResponseError, con.modelget, "m") def test_scripts(self): con = self.get_client() - self.assertRaises(ResponseError, con.scriptset, - 'ket', 'cpu', 'return 1') - con.scriptset('ket', 'cpu', script) - con.tensorset('a', (2, 3), dtype='float') - con.tensorset('b', (2, 3), dtype='float') + self.assertRaises(ResponseError, con.scriptset, "ket", "cpu", "return 1") + con.scriptset("ket", "cpu", script) + con.tensorset("a", (2, 3), dtype="float") + con.tensorset("b", (2, 3), dtype="float") # try with bad arguments: - self.assertRaises(ResponseError, - con.scriptrun, 'ket', 'bar', inputs=['a'], outputs=['c']) - con.scriptrun('ket', 'bar', inputs=['a', 'b'], outputs=['c']) - tensor = con.tensorget('c', as_numpy=False) - self.assertEqual([4, 6], tensor['values']) - script_det = con.scriptget('ket') - self.assertTrue(script_det['device'] == 'cpu') - self.assertTrue(script_det['source'] == script) - script_det = con.scriptget('ket', meta_only=True) - self.assertTrue(script_det['device'] == 'cpu') - self.assertNotIn('source', script_det) - con.scriptdel('ket') - self.assertRaises(ResponseError, con.scriptget, 'ket') + self.assertRaises( + ResponseError, con.scriptrun, "ket", "bar", inputs=["a"], outputs=["c"] + ) + con.scriptrun("ket", "bar", inputs=["a", "b"], outputs=["c"]) + tensor = con.tensorget("c", as_numpy=False) + self.assertEqual([4, 6], tensor["values"]) + script_det = con.scriptget("ket") + self.assertTrue(script_det["device"] == "cpu") + self.assertTrue(script_det["source"] == script) + script_det = con.scriptget("ket", meta_only=True) + self.assertTrue(script_det["device"] == "cpu") + self.assertNotIn("source", script_det) + con.scriptdel("ket") + self.assertRaises(ResponseError, con.scriptget, "ket") def test_run_onnxml_model(self): - mlmodel_path = os.path.join(MODEL_DIR, 'boston.onnx') + mlmodel_path = os.path.join(MODEL_DIR, "boston.onnx") onnxml_model = load_model(mlmodel_path) con = self.get_client() - con.modelset("onnx_model", 'onnx', 'cpu', onnxml_model) + con.modelset("onnx_model", "onnx", "cpu", onnxml_model) tensor = np.ones((1, 13)).astype(np.float32) con.tensorset("input", tensor) con.modelrun("onnx_model", ["input"], ["output"]) # tests `convert_to_num` outtensor = con.tensorget("output", as_numpy=False) - self.assertEqual(int(float(outtensor['values'][0])), 24) + self.assertEqual(int(float(outtensor["values"][0])), 24) def test_run_onnxdl_model(self): # A PyTorch model that finds the square - dlmodel_path = os.path.join(MODEL_DIR, 'findsquare.onnx') + dlmodel_path = os.path.join(MODEL_DIR, "findsquare.onnx") onnxdl_model = load_model(dlmodel_path) con = self.get_client() - con.modelset("onnx_model", 'onnx', 'cpu', onnxdl_model) + con.modelset("onnx_model", "onnx", "cpu", onnxdl_model) tensor = np.array((2,)).astype(np.float32) con.tensorset("input", tensor) con.modelrun("onnx_model", ["input"], ["output"]) @@ -247,154 +297,169 @@ def test_run_onnxdl_model(self): self.assertTrue(np.allclose(outtensor, [4.0])) def test_run_pytorch_model(self): - model_path = os.path.join(MODEL_DIR, 'pt-minimal.pt') + model_path = os.path.join(MODEL_DIR, torch_graph) ptmodel = load_model(model_path) con = self.get_client() - con.modelset("pt_model", 'torch', 'cpu', ptmodel, tag='v1.0') - con.tensorset('a', [2, 3, 2, 3], shape=(2, 2), dtype='float') - con.tensorset('b', [2, 3, 2, 3], shape=(2, 2), dtype='float') + con.modelset("pt_model", "torch", "cpu", ptmodel, tag="v1.0") + con.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") + con.tensorset("b", [2, 3, 2, 3], shape=(2, 2), dtype="float") con.modelrun("pt_model", ["a", "b"], ["output"]) - output = con.tensorget('output', as_numpy=False) - self.assertTrue(np.allclose(output['values'], [4, 6, 4, 6])) + output = con.tensorget("output", as_numpy=False) + self.assertTrue(np.allclose(output["values"], [4, 6, 4, 6])) def test_run_tflite_model(self): - model_path = os.path.join(MODEL_DIR, 'mnist_model_quant.tflite') + model_path = os.path.join(MODEL_DIR, "mnist_model_quant.tflite") tflmodel = load_model(model_path) con = self.get_client() - con.modelset("tfl_model", 'tflite', 'cpu', tflmodel) + con.modelset("tfl_model", "tflite", "cpu", tflmodel) img = np.random.random((1, 1, 28, 28)).astype(np.float) - con.tensorset('img', img) + con.tensorset("img", img) con.modelrun("tfl_model", ["img"], ["output1", "output2"]) - output = con.tensorget('output1') + output = con.tensorget("output1") self.assertTrue(np.allclose(output, [8])) def test_info(self): - model_path = os.path.join(MODEL_DIR, 'graph.pb') + model_path = os.path.join(MODEL_DIR, tf_graph) model_pb = load_model(model_path) con = self.get_client() - con.modelset('m', 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul']) - first_info = con.infoget('m') - expected = {'key': 'm', 'type': 'MODEL', 'backend': 'TF', 'device': 'cpu', - 'tag': '', 'duration': 0, 'samples': 0, 'calls': 0, 'errors': 0} + con.modelset("m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"]) + first_info = con.infoget("m") + expected = { + "key": "m", + "type": "MODEL", + "backend": "TF", + "device": "cpu", + "tag": "", + "duration": 0, + "samples": 0, + "calls": 0, + "errors": 0, + } self.assertEqual(first_info, expected) - con.tensorset('a', (2, 3), dtype='float') - con.tensorset('b', (2, 3), dtype='float') - con.modelrun('m', ['a', 'b'], ['c']) - con.modelrun('m', ['a', 'b'], ['c']) - second_info = con.infoget('m') - self.assertEqual(second_info['calls'], 2) # 2 model runs - con.inforeset('m') - third_info = con.infoget('m') + con.tensorset("a", (2, 3), dtype="float") + con.tensorset("b", (2, 3), dtype="float") + con.modelrun("m", ["a", "b"], ["c"]) + con.modelrun("m", ["a", "b"], ["c"]) + second_info = con.infoget("m") + self.assertEqual(second_info["calls"], 2) # 2 model runs + con.inforeset("m") + third_info = con.infoget("m") self.assertEqual(first_info, third_info) # before modelrun and after reset def test_model_scan(self): - model_path = os.path.join(MODEL_DIR, 'graph.pb') + model_path = os.path.join(MODEL_DIR, tf_graph) model_pb = load_model(model_path) con = self.get_client() - con.modelset('m', 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.2') - model_path = os.path.join(MODEL_DIR, 'pt-minimal.pt') + con.modelset( + "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"], tag="v1.2" + ) + model_path = os.path.join(MODEL_DIR, torch_graph) ptmodel = load_model(model_path) con = self.get_client() # TODO: RedisAI modelscan issue - con.modelset("pt_model", 'torch', 'cpu', ptmodel) + con.modelset("pt_model", "torch", "cpu", ptmodel) mlist = con.modelscan() - self.assertEqual(mlist, [['pt_model', ''], ['m', 'v1.2']]) + self.assertEqual(mlist, [["pt_model", ""], ["m", "v1.2"]]) def test_script_scan(self): con = self.get_client() - con.scriptset('ket1', 'cpu', script, tag='v1.0') - con.scriptset('ket2', 'cpu', script) + con.scriptset("ket1", "cpu", script, tag="v1.0") + con.scriptset("ket2", "cpu", script) slist = con.scriptscan() - self.assertEqual(slist, [['ket1', 'v1.0'], ['ket2', '']]) + self.assertEqual(slist, [["ket1", "v1.0"], ["ket2", ""]]) def test_debug(self): con = self.get_client(debug=True) with Capturing() as output: - con.tensorset('x', (2, 3, 4, 5), dtype='float') - self.assertEqual(['AI.TENSORSET x FLOAT 4 VALUES 2 3 4 5'], output) + con.tensorset("x", (2, 3, 4, 5), dtype="float") + self.assertEqual(["AI.TENSORSET x FLOAT 4 VALUES 2 3 4 5"], output) class DagTestCase(RedisAITestBase): def setUp(self): super().setUp() con = self.get_client() - model_path = os.path.join(MODEL_DIR, 'pt-minimal.pt') + model_path = os.path.join(MODEL_DIR, torch_graph) ptmodel = load_model(model_path) - con.modelset("pt_model", 'torch', 'cpu', ptmodel, tag='v7.0') + con.modelset("pt_model", "torch", "cpu", ptmodel, tag="v7.0") def test_dagrun_with_load(self): con = self.get_client() - con.tensorset('a', [2, 3, 2, 3], shape=(2, 2), dtype='float') + con.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") - dag = con.dag(load='a') - dag.tensorset('b', [2, 3, 2, 3], shape=(2, 2), dtype='float') + dag = con.dag(load="a") + dag.tensorset("b", [2, 3, 2, 3], shape=(2, 2), dtype="float") dag.modelrun("pt_model", ["a", "b"], ["output"]) - dag.tensorget('output') + dag.tensorget("output") result = dag.run() - expected = ['OK', 'OK', np.array([[4., 6.], [4., 6.]], dtype=np.float32)] + expected = ["OK", "OK", np.array([[4.0, 6.0], [4.0, 6.0]], dtype=np.float32)] self.assertTrue(np.allclose(expected.pop(), result.pop())) self.assertEqual(expected, result) - self.assertRaises(ResponseError, con.tensorget, 'b') + self.assertRaises(ResponseError, con.tensorget, "b") def test_dagrun_with_persist(self): con = self.get_client() with self.assertRaises(ResponseError): - dag = con.dag(persist='wrongkey') - dag.tensorset('a', [2, 3, 2, 3], shape=(2, 2), dtype='float').run() + dag = con.dag(persist="wrongkey") + dag.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float").run() - dag = con.dag(persist=['b']) - dag.tensorset('a', [2, 3, 2, 3], shape=(2, 2), dtype='float') - dag.tensorset('b', [2, 3, 2, 3], shape=(2, 2), dtype='float') - dag.tensorget('b') + dag = con.dag(persist=["b"]) + dag.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") + dag.tensorset("b", [2, 3, 2, 3], shape=(2, 2), dtype="float") + dag.tensorget("b") result = dag.run() - b = con.tensorget('b') + b = con.tensorget("b") self.assertTrue(np.allclose(b, result[-1])) self.assertEqual(b.dtype, np.float32) self.assertEqual(len(result), 3) def test_dagrun_calling_on_return(self): con = self.get_client() - con.tensorset('a', [2, 3, 2, 3], shape=(2, 2), dtype='float') - result = con.\ - dag(load='a').\ - tensorset('b', [2, 3, 2, 3], shape=(2, 2), dtype='float').\ - modelrun("pt_model", ["a", "b"], ["output"]).\ - tensorget('output').\ - run() - expected = ['OK', 'OK', np.array([[4., 6.], [4., 6.]], dtype=np.float32)] + con.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") + result = ( + con.dag(load="a") + .tensorset("b", [2, 3, 2, 3], shape=(2, 2), dtype="float") + .modelrun("pt_model", ["a", "b"], ["output"]) + .tensorget("output") + .run() + ) + expected = ["OK", "OK", np.array([[4.0, 6.0], [4.0, 6.0]], dtype=np.float32)] self.assertTrue(np.allclose(expected.pop(), result.pop())) self.assertEqual(expected, result) def test_dagrun_without_load_and_persist(self): con = self.get_client() - dag = con.dag(load='wrongkey') + dag = con.dag(load="wrongkey") with self.assertRaises(ResponseError): - dag.tensorget('wrongkey').run() + dag.tensorget("wrongkey").run() dag = con.dag() - dag.tensorset('a', [2, 3, 2, 3], shape=(2, 2), dtype='float') - dag.tensorset('b', [2, 3, 2, 3], shape=(2, 2), dtype='float') + dag.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") + dag.tensorset("b", [2, 3, 2, 3], shape=(2, 2), dtype="float") dag.modelrun("pt_model", ["a", "b"], ["output"]) - dag.tensorget('output') + dag.tensorget("output") result = dag.run() - expected = ['OK', 'OK', 'OK', np.array([[4., 6.], [4., 6.]], dtype=np.float32)] + expected = [ + "OK", + "OK", + "OK", + np.array([[4.0, 6.0], [4.0, 6.0]], dtype=np.float32), + ] self.assertTrue(np.allclose(expected.pop(), result.pop())) self.assertEqual(expected, result) def test_dagrun_with_load_and_persist(self): con = self.get_client() - con.tensorset('a', [2, 3, 2, 3], shape=(2, 2), dtype='float') - con.tensorset('b', [2, 3, 2, 3], shape=(2, 2), dtype='float') - dag = con.dag(load=['a', 'b'], persist='output') + con.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") + con.tensorset("b", [2, 3, 2, 3], shape=(2, 2), dtype="float") + dag = con.dag(load=["a", "b"], persist="output") dag.modelrun("pt_model", ["a", "b"], ["output"]) - dag.tensorget('output') + dag.tensorget("output") result = dag.run() - expected = ['OK', np.array([[4., 6.], [4., 6.]], dtype=np.float32)] - result_outside_dag = con.tensorget('output') + expected = ["OK", np.array([[4.0, 6.0], [4.0, 6.0]], dtype=np.float32)] + result_outside_dag = con.tensorget("output") self.assertTrue(np.allclose(expected.pop(), result.pop())) result = dag.run() self.assertTrue(np.allclose(result_outside_dag, result.pop())) @@ -402,29 +467,34 @@ def test_dagrun_with_load_and_persist(self): def test_dagrunRO(self): con = self.get_client() - con.tensorset('a', [2, 3, 2, 3], shape=(2, 2), dtype='float') - con.tensorset('b', [2, 3, 2, 3], shape=(2, 2), dtype='float') + con.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") + con.tensorset("b", [2, 3, 2, 3], shape=(2, 2), dtype="float") with self.assertRaises(RuntimeError): - con.dag(load=['a', 'b'], persist='output', readonly=True) - dag = con.dag(load=['a', 'b'], readonly=True) + con.dag(load=["a", "b"], persist="output", readonly=True) + dag = con.dag(load=["a", "b"], readonly=True) dag.modelrun("pt_model", ["a", "b"], ["output"]) - dag.tensorget('output') + dag.tensorget("output") result = dag.run() - expected = ['OK', np.array([[4., 6.], [4., 6.]], dtype=np.float32)] + expected = ["OK", np.array([[4.0, 6.0], [4.0, 6.0]], dtype=np.float32)] self.assertTrue(np.allclose(expected.pop(), result.pop())) class PipelineTest(RedisAITestBase): - def test_pipeline_non_transaction(self): con = self.get_client() - arr = np.array([[2., 3.], [2., 3.]], dtype=np.float32) + arr = np.array([[2.0, 3.0], [2.0, 3.0]], dtype=np.float32) pipe = con.pipeline(transaction=False) - pipe = pipe.tensorset('a', arr).set('native', 1) - pipe = pipe.tensorget('a', as_numpy=False) - pipe = pipe.tensorget('a', as_numpy=True).tensorget('a', meta_only=True) + pipe = pipe.tensorset("a", arr).set("native", 1) + pipe = pipe.tensorget("a", as_numpy=False) + pipe = pipe.tensorget("a", as_numpy=True).tensorget("a", meta_only=True) result = pipe.execute() - expected = [b'OK', True, {'dtype': 'FLOAT', 'shape': [2, 2], 'values': [2.0, 3.0, 2.0, 3.0]}, arr, {'dtype': 'FLOAT', 'shape': [2, 2]}] + expected = [ + b"OK", + True, + {"dtype": "FLOAT", "shape": [2, 2], "values": [2.0, 3.0, 2.0, 3.0]}, + arr, + {"dtype": "FLOAT", "shape": [2, 2]}, + ] for res, exp in zip(result, expected): if isinstance(res, np.ndarray): self.assertTrue(np.allclose(exp, res)) @@ -433,17 +503,21 @@ def test_pipeline_non_transaction(self): def test_pipeline_transaction(self): con = self.get_client() - arr = np.array([[2., 3.], [2., 3.]], dtype=np.float32) + arr = np.array([[2.0, 3.0], [2.0, 3.0]], dtype=np.float32) pipe = con.pipeline(transaction=True) - pipe = pipe.tensorset('a', arr).set('native', 1) - pipe = pipe.tensorget('a', as_numpy=False) - pipe = pipe.tensorget('a', as_numpy=True).tensorget('a', meta_only=True) + pipe = pipe.tensorset("a", arr).set("native", 1) + pipe = pipe.tensorget("a", as_numpy=False) + pipe = pipe.tensorget("a", as_numpy=True).tensorget("a", meta_only=True) result = pipe.execute() - expected = [b'OK', True, {'dtype': 'FLOAT', 'shape': [2, 2], 'values': [2.0, 3.0, 2.0, 3.0]}, arr, - {'dtype': 'FLOAT', 'shape': [2, 2]}] + expected = [ + b"OK", + True, + {"dtype": "FLOAT", "shape": [2, 2], "values": [2.0, 3.0, 2.0, 3.0]}, + arr, + {"dtype": "FLOAT", "shape": [2, 2]}, + ] for res, exp in zip(result, expected): if isinstance(res, np.ndarray): self.assertTrue(np.allclose(exp, res)) else: self.assertEqual(res, exp) -