Skip to content

Added a HybridCachingOption to throw an error if the distributed cache throws an error #413

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ public class HybridCachingOptions
/// <value><c>true</c> if enable logging; otherwise, <c>false</c>.</value>
public bool EnableLogging { get; set; }

/// <summary>
/// Gets or sets a value indicating whether an exception should be thrown if an error on the distributed cache has occurred
/// </summary>
/// <value><c>true</c> if distributed cache exceptions should not be ignored; otherwise, <c>false</c>.</value>
public bool ThrowIfDistributedCacheError { get; set; }

/// <summary>
/// local cache provider name
/// </summary>
Expand Down
141 changes: 123 additions & 18 deletions src/EasyCaching.HybridCache/HybridCachingProvider.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
namespace EasyCaching.HybridCache
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EasyCaching.Core;
using EasyCaching.Core.Bus;
using Microsoft.Extensions.Logging;
using Polly;
using Polly.Fallback;
using Polly.Retry;
using Polly.Wrap;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Hybrid caching provider.
Expand Down Expand Up @@ -123,7 +123,7 @@ string name
/// <param name="message">Message.</param>
private void OnMessage(EasyCachingMessage message)
{
// each clients will recive the message, current client should ignore.
// each clients will receive the message, current client should ignore.
if (!string.IsNullOrWhiteSpace(message.Id) && message.Id.Equals(_cacheId, StringComparison.OrdinalIgnoreCase))
return;

Expand All @@ -136,7 +136,7 @@ private void OnMessage(EasyCachingMessage message)

LogMessage($"remove local cache that pattern is {pattern}");

return;
return;
}

// remove by prefix
Expand Down Expand Up @@ -178,6 +178,11 @@ public bool Exists(string cacheKey)
catch (Exception ex)
{
LogMessage($"Check cache key exists error [{cacheKey}] ", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

flag = _localCache.Exists(cacheKey);
Expand All @@ -186,7 +191,7 @@ public bool Exists(string cacheKey)
}

/// <summary>
/// Existses the specified cacheKey async.
/// Exists the specified cacheKey async.
/// </summary>
/// <returns>The async.</returns>
/// <param name="cacheKey">Cache key.</param>
Expand All @@ -206,6 +211,11 @@ public async Task<bool> ExistsAsync(string cacheKey, CancellationToken cancellat
catch (Exception ex)
{
LogMessage($"Check cache key [{cacheKey}] exists error", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

flag = await _localCache.ExistsAsync(cacheKey, cancellationToken);
Expand Down Expand Up @@ -239,12 +249,17 @@ public CacheValue<T> Get<T>(string cacheKey)
catch (Exception ex)
{
LogMessage($"distributed cache get error, [{cacheKey}]", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

if (cacheValue.HasValue)
{
TimeSpan ts = GetExpiration(cacheKey);

_localCache.Set(cacheKey, cacheValue.Value, ts);

return cacheValue;
Expand Down Expand Up @@ -282,6 +297,11 @@ public async Task<CacheValue<T>> GetAsync<T>(string cacheKey, CancellationToken
catch (Exception ex)
{
LogMessage($"distributed cache get error, [{cacheKey}]", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

if (cacheValue.HasValue)
Expand Down Expand Up @@ -314,6 +334,11 @@ public void Remove(string cacheKey)
catch (Exception ex)
{
LogMessage($"remove cache key [{cacheKey}] error", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

_localCache.Remove(cacheKey);
Expand All @@ -340,6 +365,11 @@ public async Task RemoveAsync(string cacheKey, CancellationToken cancellationTok
catch (Exception ex)
{
LogMessage($"remove cache key [{cacheKey}] error", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

await _localCache.RemoveAsync(cacheKey, cancellationToken);
Expand Down Expand Up @@ -368,6 +398,11 @@ public void Set<T>(string cacheKey, T cacheValue, TimeSpan expiration)
catch (Exception ex)
{
LogMessage($"set cache key [{cacheKey}] error", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

// When create/update cache, send message to bus so that other clients can remove it.
Expand Down Expand Up @@ -396,6 +431,11 @@ public async Task SetAsync<T>(string cacheKey, T cacheValue, TimeSpan expiration
catch (Exception ex)
{
LogMessage($"set cache key [{cacheKey}] error", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

// When create/update cache, send message to bus so that other clients can remove it.
Expand Down Expand Up @@ -425,6 +465,11 @@ public bool TrySet<T>(string cacheKey, T cacheValue, TimeSpan expiration)
{
distributedError = true;
LogMessage($"tryset cache key [{cacheKey}] error", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

if (flag && !distributedError)
Expand Down Expand Up @@ -473,6 +518,11 @@ public async Task<bool> TrySetAsync<T>(string cacheKey, T cacheValue, TimeSpan e
{
distributedError = true;
LogMessage($"tryset cache key [{cacheKey}] error", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

if (flag && !distributedError)
Expand Down Expand Up @@ -514,6 +564,11 @@ public void SetAll<T>(IDictionary<string, T> value, TimeSpan expiration)
catch (Exception ex)
{
LogMessage($"set all from distributed provider error [{string.Join(",", value.Keys)}]", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

// send message to bus
Expand All @@ -539,6 +594,11 @@ public async Task SetAllAsync<T>(IDictionary<string, T> value, TimeSpan expirati
catch (Exception ex)
{
LogMessage($"set all from distributed provider error [{string.Join(",", value.Keys)}]", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

// send message to bus
Expand All @@ -555,11 +615,16 @@ public void RemoveAll(IEnumerable<string> cacheKeys)

try
{
_distributedCache.RemoveAllAsync(cacheKeys);
_distributedCache.RemoveAll(cacheKeys);
}
catch (Exception ex)
{
LogMessage($"remove all from distributed provider error [{string.Join(",", cacheKeys)}]", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

_localCache.RemoveAll(cacheKeys);
Expand All @@ -585,6 +650,11 @@ public async Task RemoveAllAsync(IEnumerable<string> cacheKeys, CancellationToke
catch (Exception ex)
{
LogMessage($"remove all async from distributed provider error [{string.Join(",", cacheKeys)}]", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

await _localCache.RemoveAllAsync(cacheKeys, cancellationToken);
Expand Down Expand Up @@ -620,6 +690,11 @@ public CacheValue<T> Get<T>(string cacheKey, Func<T> dataRetriever, TimeSpan exp
catch (Exception ex)
{
LogMessage($"get with data retriever from distributed provider error [{cacheKey}]", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

if (result.HasValue)
Expand Down Expand Up @@ -662,13 +737,18 @@ public async Task<CacheValue<T>> GetAsync<T>(string cacheKey, Func<Task<T>> data
catch (Exception ex)
{
LogMessage($"get async with data retriever from distributed provider error [{cacheKey}]", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

if (result.HasValue)
{
TimeSpan ts = await GetExpirationAsync(cacheKey, cancellationToken);

_localCache.Set(cacheKey, result.Value, ts);
await _localCache.SetAsync(cacheKey, result.Value, ts, cancellationToken);

return result;
}
Expand All @@ -691,6 +771,11 @@ public void RemoveByPrefix(string prefix)
catch (Exception ex)
{
LogMessage($"remove by prefix [{prefix}] error", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

_localCache.RemoveByPrefix(prefix);
Expand All @@ -716,14 +801,19 @@ public async Task RemoveByPrefixAsync(string prefix, CancellationToken cancellat
catch (Exception ex)
{
LogMessage($"remove by prefix [{prefix}] error", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

await _localCache.RemoveByPrefixAsync(prefix);
await _localCache.RemoveByPrefixAsync(prefix, cancellationToken);

// send message to bus in order to notify other clients.
await _busAsyncWrap.ExecuteAsync(async (ct) => await _bus.PublishAsync(_options.TopicName, new EasyCachingMessage { Id = _cacheId, CacheKeys = new string[] { prefix }, IsPrefix = true }, ct), cancellationToken);
}

/// <summary>
/// Removes the by pattern async.
/// </summary>
Expand All @@ -741,14 +831,19 @@ public async Task RemoveByPatternAsync(string pattern, CancellationToken cancell
catch (Exception ex)
{
LogMessage($"remove by pattern [{pattern}] error", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

await _localCache.RemoveByPatternAsync(pattern);
await _localCache.RemoveByPatternAsync(pattern, cancellationToken);

// send message to bus in order to notify other clients.
await _busAsyncWrap.ExecuteAsync(async (ct) => await _bus.PublishAsync(_options.TopicName, new EasyCachingMessage { Id = _cacheId, CacheKeys = new string[] { pattern }, IsPattern = true}, ct), cancellationToken);
await _busAsyncWrap.ExecuteAsync(async (ct) => await _bus.PublishAsync(_options.TopicName, new EasyCachingMessage { Id = _cacheId, CacheKeys = new string[] { pattern }, IsPattern = true }, ct), cancellationToken);
}

/// <summary>
/// Removes the by pattern.
/// </summary>
Expand All @@ -765,12 +860,17 @@ public void RemoveByPattern(string pattern)
catch (Exception ex)
{
LogMessage($"remove by pattern [{pattern}] error", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

_localCache.RemoveByPattern(pattern);

// send message to bus
_busSyncWrap.Execute(() => _bus.Publish(_options.TopicName, new EasyCachingMessage { Id = _cacheId, CacheKeys = new string[] { pattern }, IsPattern = true}));
_busSyncWrap.Execute(() => _bus.Publish(_options.TopicName, new EasyCachingMessage { Id = _cacheId, CacheKeys = new string[] { pattern }, IsPattern = true }));
}

/// <summary>
Expand Down Expand Up @@ -855,12 +955,17 @@ public async Task<object> GetAsync(string cacheKey, Type type, CancellationToken
catch (Exception ex)
{
LogMessage($"distributed cache get error, [{cacheKey}]", ex);

if (_options.ThrowIfDistributedCacheError)
{
throw;
}
}

if (cacheValue != null)
{
TimeSpan ts = await GetExpirationAsync(cacheKey, cancellationToken);

await _localCache.SetAsync(cacheKey, cacheValue, ts, cancellationToken);

return cacheValue;
Expand Down
Loading