-
-
Notifications
You must be signed in to change notification settings - Fork 324
fix: Special-case suffix requests in obstore backend to support Azure #2994
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Special-case suffix requests in obstore backend to support Azure #2994
Conversation
I'm fine merging this without tests, because I think writing tests for this would be kind of cumbersome in our test suite as it is today (let me know if I'm wrong here). Longer term we should figure out how to make it easy to test this kind of thing. |
Indeed I'm not entirely sure how to test it. We'd need to at least mock a backend that doesn't support suffix requests. Maybe that wouldn't be too hard to do? |
There's a docker container for the Azurite storage emulator that supports most non-auth things. However, I think since the test suite doesn't already use docker it's probably not worth setting up just for this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't test this myself, but I trust that this fix resolves the linked issue. If that turns out to be wrong, then it's all the more reason to beef up our remote store testing infrastructure in a separate PR.
FWIW I didn't test this either. Maybe @lsim-aegeri can test from this branch? |
I will test later today! |
I'm afraid I'm still getting the error. I double checked the file you modified in my .venv and confirmed I am using a version with the changes you made. Current traceback: ---------------------------------------------------------------------------
NotSupportedError Traceback (most recent call last)
Cell In[46], line 51
48 ds_n = xr.open_zarr(objstore_xr, consolidated=False)
50 # However, I get the error when loading the chunks into memory
---> 51 ds_n.compute()
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/core/dataset.py:714, in Dataset.compute(self, **kwargs)
690 """Manually trigger loading and/or computation of this dataset's data
691 from disk or a remote source into memory and return a new dataset.
692 Unlike load, the original dataset is left unaltered.
(...) 711 dask.compute
712 """
713 new = self.copy(deep=False)
--> 714 return new.load(**kwargs)
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/core/dataset.py:541, in Dataset.load(self, **kwargs)
538 chunkmanager = get_chunked_array_type(*lazy_data.values())
540 # evaluate all the chunked arrays simultaneously
--> 541 evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute(
542 *lazy_data.values(), **kwargs
543 )
545 for k, data in zip(lazy_data, evaluated_data, strict=False):
546 self.variables[k].data = data
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/namedarray/daskmanager.py:85, in DaskManager.compute(self, *data, **kwargs)
80 def compute(
81 self, *data: Any, **kwargs: Any
82 ) -> tuple[np.ndarray[Any, _DType_co], ...]:
83 from dask.array import compute
---> 85 return compute(*data, **kwargs)
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/dask/base.py:656, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
653 postcomputes.append(x.__dask_postcompute__())
655 with shorten_traceback():
--> 656 results = schedule(dsk, keys, **kwargs)
658 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/core/indexing.py:574, in ImplicitToExplicitIndexingAdapter.__array__(self, dtype, copy)
570 def __array__(
571 self, dtype: np.typing.DTypeLike = None, /, *, copy: bool | None = None
572 ) -> np.ndarray:
573 if Version(np.__version__) >= Version("2.0.0"):
--> 574 return np.asarray(self.get_duck_array(), dtype=dtype, copy=copy)
575 else:
576 return np.asarray(self.get_duck_array(), dtype=dtype)
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/core/indexing.py:579, in ImplicitToExplicitIndexingAdapter.get_duck_array(self)
578 def get_duck_array(self):
--> 579 return self.array.get_duck_array()
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/core/indexing.py:790, in CopyOnWriteArray.get_duck_array(self)
789 def get_duck_array(self):
--> 790 return self.array.get_duck_array()
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/core/indexing.py:653, in LazilyIndexedArray.get_duck_array(self)
649 array = apply_indexer(self.array, self.key)
650 else:
651 # If the array is not an ExplicitlyIndexedNDArrayMixin,
652 # it may wrap a BackendArray so use its __getitem__
--> 653 array = self.array[self.key]
655 # self.array[self.key] is now a numpy array when
656 # self.array is a BackendArray subclass
657 # and self.key is BasicIndexer((slice(None, None, None),))
658 # so we need the explicit check for ExplicitlyIndexed
659 if isinstance(array, ExplicitlyIndexed):
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/backends/zarr.py:223, in ZarrArrayWrapper.__getitem__(self, key)
221 elif isinstance(key, indexing.OuterIndexer):
222 method = self._oindex
--> 223 return indexing.explicit_indexing_adapter(
224 key, array.shape, indexing.IndexingSupport.VECTORIZED, method
225 )
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/core/indexing.py:1014, in explicit_indexing_adapter(key, shape, indexing_support, raw_indexing_method)
992 """Support explicit indexing by delegating to a raw indexing method.
993
994 Outer and/or vectorized indexers are supported by indexing a second time
(...) 1011 Indexing result, in the form of a duck numpy-array.
1012 """
1013 raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
-> 1014 result = raw_indexing_method(raw_key.tuple)
1015 if numpy_indices.tuple:
1016 # index the loaded duck array
1017 indexable = as_indexable(result)
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/backends/zarr.py:213, in ZarrArrayWrapper._getitem(self, key)
212 def _getitem(self, key):
--> 213 return self._array[key]
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/array.py:2430, in Array.__getitem__(self, selection)
2428 return self.vindex[cast(CoordinateSelection | MaskSelection, selection)]
2429 elif is_pure_orthogonal_indexing(pure_selection, self.ndim):
-> 2430 return self.get_orthogonal_selection(pure_selection, fields=fields)
2431 else:
2432 return self.get_basic_selection(cast(BasicSelection, pure_selection), fields=fields)
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/_compat.py:43, in _deprecate_positional_args.<locals>._inner_deprecate_positional_args.<locals>.inner_f(*args, **kwargs)
41 extra_args = len(args) - len(all_args)
42 if extra_args <= 0:
---> 43 return f(*args, **kwargs)
45 # extra_args > 0
46 args_msg = [
47 f"{name}={arg}"
48 for name, arg in zip(kwonly_args[:extra_args], args[-extra_args:], strict=False)
49 ]
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/array.py:2872, in Array.get_orthogonal_selection(self, selection, out, fields, prototype)
2870 prototype = default_buffer_prototype()
2871 indexer = OrthogonalIndexer(selection, self.shape, self.metadata.chunk_grid)
-> 2872 return sync(
2873 self._async_array._get_selection(
2874 indexer=indexer, out=out, fields=fields, prototype=prototype
2875 )
2876 )
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/sync.py:163, in sync(coro, loop, timeout)
160 return_result = next(iter(finished)).result()
162 if isinstance(return_result, BaseException):
--> 163 raise return_result
164 else:
165 return return_result
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/sync.py:119, in _runner(coro)
114 """
115 Await a coroutine and return the result of running it. If awaiting the coroutine raises an
116 exception, the exception will be returned.
117 """
118 try:
--> 119 return await coro
120 except Exception as ex:
121 return ex
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/array.py:1289, in AsyncArray._get_selection(self, indexer, prototype, out, fields)
1286 _config = replace(_config, order=self.metadata.order)
1288 # reading chunks and decoding them
-> 1289 await self.codec_pipeline.read(
1290 [
1291 (
1292 self.store_path / self.metadata.encode_chunk_key(chunk_coords),
1293 self.metadata.get_chunk_spec(chunk_coords, _config, prototype=prototype),
1294 chunk_selection,
1295 out_selection,
1296 is_complete_chunk,
1297 )
1298 for chunk_coords, chunk_selection, out_selection, is_complete_chunk in indexer
1299 ],
1300 out_buffer,
1301 drop_axes=indexer.drop_axes,
1302 )
1303 if isinstance(indexer, BasicIndexer) and indexer.shape == ():
1304 return out_buffer.as_scalar()
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/codec_pipeline.py:464, in BatchedCodecPipeline.read(self, batch_info, out, drop_axes)
458 async def read(
459 self,
460 batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]],
461 out: NDBuffer,
462 drop_axes: tuple[int, ...] = (),
463 ) -> None:
--> 464 await concurrent_map(
465 [
466 (single_batch_info, out, drop_axes)
467 for single_batch_info in batched(batch_info, self.batch_size)
468 ],
469 self.read_batch,
470 config.get("async.concurrency"),
471 )
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/common.py:68, in concurrent_map(items, func, limit)
65 async with sem:
66 return await func(*item)
---> 68 return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/common.py:66, in concurrent_map.<locals>.run(item)
64 async def run(item: tuple[Any]) -> V:
65 async with sem:
---> 66 return await func(*item)
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/codec_pipeline.py:251, in BatchedCodecPipeline.read_batch(self, batch_info, out, drop_axes)
244 async def read_batch(
245 self,
246 batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]],
247 out: NDBuffer,
248 drop_axes: tuple[int, ...] = (),
249 ) -> None:
250 if self.supports_partial_decode:
--> 251 chunk_array_batch = await self.decode_partial_batch(
252 [
253 (byte_getter, chunk_selection, chunk_spec)
254 for byte_getter, chunk_spec, chunk_selection, *_ in batch_info
255 ]
256 )
257 for chunk_array, (_, chunk_spec, _, out_selection, _) in zip(
258 chunk_array_batch, batch_info, strict=False
259 ):
260 if chunk_array is not None:
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/codec_pipeline.py:207, in BatchedCodecPipeline.decode_partial_batch(self, batch_info)
205 assert self.supports_partial_decode
206 assert isinstance(self.array_bytes_codec, ArrayBytesCodecPartialDecodeMixin)
--> 207 return await self.array_bytes_codec.decode_partial(batch_info)
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/abc/codec.py:198, in ArrayBytesCodecPartialDecodeMixin.decode_partial(self, batch_info)
178 async def decode_partial(
179 self,
180 batch_info: Iterable[tuple[ByteGetter, SelectorTuple, ArraySpec]],
181 ) -> Iterable[NDBuffer | None]:
182 """Partially decodes a batch of chunks.
183 This method determines parts of a chunk from the slice selection,
184 fetches these parts from the store (via ByteGetter) and decodes them.
(...) 196 Iterable[NDBuffer | None]
197 """
--> 198 return await concurrent_map(
199 list(batch_info),
200 self._decode_partial_single,
201 config.get("async.concurrency"),
202 )
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/common.py:68, in concurrent_map(items, func, limit)
65 async with sem:
66 return await func(*item)
---> 68 return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/common.py:66, in concurrent_map.<locals>.run(item)
64 async def run(item: tuple[Any]) -> V:
65 async with sem:
---> 66 return await func(*item)
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/codecs/sharding.py:506, in ShardingCodec._decode_partial_single(self, byte_getter, selection, shard_spec)
503 shard_dict = shard_dict_maybe
504 else:
505 # read some chunks within the shard
--> 506 shard_index = await self._load_shard_index_maybe(byte_getter, chunks_per_shard)
507 if shard_index is None:
508 return None
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/codecs/sharding.py:718, in ShardingCodec._load_shard_index_maybe(self, byte_getter, chunks_per_shard)
713 index_bytes = await byte_getter.get(
714 prototype=numpy_buffer_prototype(),
715 byte_range=RangeByteRequest(0, shard_index_size),
716 )
717 else:
--> 718 index_bytes = await byte_getter.get(
719 prototype=numpy_buffer_prototype(), byte_range=SuffixByteRequest(shard_index_size)
720 )
721 if index_bytes is not None:
722 return await self._decode_shard_index(index_bytes, chunks_per_shard)
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/storage/_common.py:124, in StorePath.get(self, prototype, byte_range)
122 if prototype is None:
123 prototype = default_buffer_prototype()
--> 124 return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)
File ~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/storage/_obstore.py:109, in ObjectStore.get(self, key, prototype, byte_range)
107 return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type]
108 elif isinstance(byte_range, SuffixByteRequest):
--> 109 resp = await obs.get_async(
110 self.store, key, options={"range": {"suffix": byte_range.suffix}}
111 )
112 return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type]
113 else:
NotSupportedError: Operation not supported: Azure does not support suffix range requests
Debug source:
NotSupported {
source: "Azure does not support suffix range requests",
} Code I'm running: import xarray as xr
import numpy as np
import zarr
from zarr.storage import ObjectStore
from obstore.store import AzureStore
objstore = ObjectStore(
store=AzureStore(
container_name=CONTAINER,
prefix="xr-test/test_shards.zarr-v3",
account_name=ACCOUNT,
sas_key=SAS,
)
)
# Reading sharded array with zarr-python works as expected
root = zarr.create_group(store=objstore, zarr_format=3, overwrite=True)
z1 = root.create_array(name='foo', shape=(10000, 10000), shards=(2000, 2000), chunks=(1000, 1000), dtype='int32')
z1[:] = np.random.randint(0, 100, size=(10000, 10000))
root_read = zarr.open_group(store=objstore, zarr_format=3, mode='r')
root_read['foo'][:]
# Writing to xarray with shards also works
ds = xr.Dataset(
{"foo": xr.DataArray(root_read['foo'][:], dims=['x', 'y'])},
)
objstore_xr = ObjectStore(
store=AzureStore(
container_name=CONTAINER,
prefix="xr-test/test_shards_xr.zarr-v3",
account_name=ACCOUNT,
sas_key=SAS,
)
)
ds.to_zarr(
objstore_xr,
mode='w',
consolidated=False,
zarr_format=3,
encoding={'foo': {'chunks': (1000, 1000), 'shards': (2000, 2000)}}
)
# Opening the dataset also works as expected
ds_n = xr.open_zarr(objstore_xr, consolidated=False)
# However, I get the error when loading the chunks into memory
ds_n.compute() |
Oh, indeed, this PR so far only fixes the multi-fetch API in |
…arr-python into kyle/obstore-suffix-request
@lsim-aegeri can you try once more? |
It worked this time, thank you so much for fixing this! Do you know if this will be included in the next pypi release? Also, I'm pretty sure this is also an issue when reading a sharded zarr with fsspec. Would you feel comfortable fixing that as well? I'll be fine using |
I'm not a primary Zarr maintainer, so I can't say for sure, but I think it's likely. The
Can you create a new issue for that? I'm not familiar with the fsspec backend myself. |
Thanks @kylebarron, and thanks for testing @lsim-aegeri. |
Fix for issue raised here: pydata/xarray#10228
The obstore backend performs suffix requests to read sharded Zarr files. Azure does not support suffix requests and the underlying
object_store
Rust code immediately errors if a suffix request is performed against Azure.The workaround probably shouldn't go in
object_store
orobstore
, because we don't want to silently perform two requests when the user asks for one. It's better for the user of those libraries to know that they have to opt-in to two requests on Azure. So I think it makes sense to have this workaround go here.Note that this extra head request could be avoided if Zarr metadata already stored the length of each file, but I don't think that's true.
TODO:
docs/user-guide/*.rst
changes/