Skip to content

Commit 08d0adb

Browse files
committed
RPC headers: add request retries support
1 parent a26c921 commit 08d0adb

File tree

1 file changed

+133
-13
lines changed
  • rsocket-messages/src/main/java/com/jauntsdn/rsocket

1 file changed

+133
-13
lines changed

rsocket-messages/src/main/java/com/jauntsdn/rsocket/Headers.java

+133-13
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,26 @@
3434
public final class Headers {
3535
public static int HEADER_LENGTH_MAX = 8192;
3636

37-
private static final Headers EMPTY = new Headers(false, 0, Collections.emptyList(), 0);
38-
private static final Headers DEFAULT_SERVICE = new Headers(true, 0, Collections.emptyList(), 0);
37+
private static final Headers EMPTY = new Headers(false, 0, null, Collections.emptyList(), 0);
38+
private static final Headers DEFAULT_SERVICE =
39+
new Headers(true, 0, null, Collections.emptyList(), 0);
3940

4041
private final boolean isDefaultService;
4142
private final int serializedSize;
4243
private final long timeoutMillis;
44+
private final RetryPolicy retryPolicy;
4345
private final List<String> keyValues;
4446
private volatile ByteBuf cache;
4547

4648
private Headers(
47-
boolean isDefaultService, long timeoutMillis, List<String> keyValues, int serializedSize) {
49+
boolean isDefaultService,
50+
long timeoutMillis,
51+
@Nullable RetryPolicy retryPolicy,
52+
List<String> keyValues,
53+
int serializedSize) {
4854
this.isDefaultService = isDefaultService;
4955
this.timeoutMillis = timeoutMillis;
56+
this.retryPolicy = retryPolicy;
5057
this.keyValues = keyValues;
5158
this.serializedSize = serializedSize;
5259
}
@@ -59,6 +66,10 @@ public long timeoutMillis() {
5966
return timeoutMillis;
6067
}
6168

69+
public RetryPolicy retryPolicy() {
70+
return retryPolicy;
71+
}
72+
6273
public String header(String name) {
6374
if (!isValidKeySize(name)) {
6475
return null;
@@ -142,7 +153,16 @@ public String setValue(String value) {
142153

143154
@Override
144155
public String toString() {
145-
return "Headers{" + "isDefaultService=" + isDefaultService + ", keyValues=" + keyValues + '}';
156+
return "Headers{"
157+
+ "isDefaultService="
158+
+ isDefaultService
159+
+ ", timeoutMillis="
160+
+ timeoutMillis
161+
+ ", retryPolicy="
162+
+ retryPolicy
163+
+ ", keyValues="
164+
+ keyValues
165+
+ '}';
146166
}
147167

148168
public Headers.Builder toBuilder() {
@@ -162,7 +182,7 @@ public static Headers create(boolean isDefaultService, String... headers) {
162182
if (headers.length == 0) {
163183
return isDefaultService ? DEFAULT_SERVICE : EMPTY;
164184
}
165-
return new Headers(isDefaultService, 0, Arrays.asList(headers), serializedSize);
185+
return new Headers(isDefaultService, 0, null, Arrays.asList(headers), serializedSize);
166186
}
167187

168188
public static Headers empty() {
@@ -178,7 +198,13 @@ public static Headers withTimeout(long timeoutMillis) {
178198
if (timeoutMillis == 0) {
179199
return EMPTY;
180200
}
181-
return new Headers(false, timeoutMillis, Collections.emptyList(), 0);
201+
return new Headers(false, timeoutMillis, null, Collections.emptyList(), 0);
202+
}
203+
204+
public static Headers withRetry(long timeoutMillis, RetryPolicy retryPolicy) {
205+
requireNonNegative(timeoutMillis, "timeoutMillis");
206+
Objects.requireNonNull(retryPolicy, "retryPolicy");
207+
return new Headers(false, timeoutMillis, retryPolicy, Collections.emptyList(), 0);
182208
}
183209

184210
public static Headers.Builder newBuilder() {
@@ -194,7 +220,7 @@ static Headers create(List<String> headers) {
194220
if (headers.isEmpty()) {
195221
return EMPTY;
196222
}
197-
return new Headers(false, 0, headers, serializedSize);
223+
return new Headers(false, 0, null, headers, serializedSize);
198224
}
199225

200226
ByteBuf cache() {
@@ -220,6 +246,7 @@ public static final class Builder {
220246
private boolean isDefaultService;
221247
private long timeoutMillis;
222248
private int serializedSize;
249+
private RetryPolicy retryPolicy;
223250

224251
private Builder(int size, List<String> headers) {
225252
int length = headers.size();
@@ -246,6 +273,11 @@ public Builder timeout(long timeoutMillis) {
246273
return this;
247274
}
248275

276+
public Builder retryPolicy(RetryPolicy retryPolicy) {
277+
this.retryPolicy = Objects.requireNonNull(retryPolicy, "retryPolicy");
278+
return this;
279+
}
280+
249281
public Builder add(String name, String value) {
250282
requireValidKeySize(name, " name");
251283
requireValidValueSize(value, " value");
@@ -299,16 +331,90 @@ public Builder remove(String name, String value) {
299331
}
300332

301333
public Headers build() {
302-
return new Headers(isDefaultService, timeoutMillis, nameValues, serializedSize);
334+
return new Headers(isDefaultService, timeoutMillis, retryPolicy, nameValues, serializedSize);
303335
}
304336
}
305337

306-
private static String requireNonEmpty(String seq, String message) {
307-
Objects.requireNonNull(seq, message);
308-
if (seq.length() == 0) {
309-
throw new IllegalArgumentException(message + " must be non-empty");
338+
/**
339+
* Configures automatic retry of failed requests. Intended mostly for RPC implementations lacking
340+
* API support for retries, e.g. futures, grpc-stub.
341+
*/
342+
public static final class RetryPolicy {
343+
private final int maxAttempts;
344+
private final float initialBackoff;
345+
private final float maxBackoff;
346+
private final int backoffMultiplier;
347+
348+
private RetryPolicy(
349+
int maxAttempts, float initialBackoff, float maxBackoff, int backoffMultiplier) {
350+
this.maxAttempts = maxAttempts;
351+
this.initialBackoff = initialBackoff;
352+
this.maxBackoff = maxBackoff;
353+
this.backoffMultiplier = backoffMultiplier;
354+
}
355+
356+
public static RetryPolicy create(
357+
int maxAttempts, long initialBackoffMillis, long maxBackoffMillis, int backoffMultiplier) {
358+
return new RetryPolicy(
359+
requirePositive(maxAttempts, "maxAttempts"),
360+
requireNonNegative(initialBackoffMillis, "initialBackoffMillis"),
361+
requireNonNegative(maxBackoffMillis, "maxBackoffMillis"),
362+
requireNonNegative(backoffMultiplier, "backoffMultiplier"));
363+
}
364+
365+
public static RetryPolicy.Builder newBuilder() {
366+
return new Builder();
367+
}
368+
369+
@Override
370+
public String toString() {
371+
return "RetryPolicy{"
372+
+ "maxAttempts="
373+
+ maxAttempts
374+
+ ", initialBackoff="
375+
+ initialBackoff
376+
+ ", maxBackoff="
377+
+ maxBackoff
378+
+ ", backoffMultiplier="
379+
+ backoffMultiplier
380+
+ '}';
381+
}
382+
383+
public static final class Builder {
384+
private int maxAttempts;
385+
private long initialBackoff;
386+
private long maxBackoff;
387+
private int backoffMultiplier;
388+
389+
public Builder() {}
390+
391+
public Builder maxAttempts(int maxAttempts) {
392+
this.maxAttempts = requirePositive(maxAttempts, "maxAttempts");
393+
return this;
394+
}
395+
396+
public Builder initialBackoff(long initialBackoffMillis) {
397+
this.initialBackoff = requireNonNegative(initialBackoffMillis, "initialBackoffMillis");
398+
return this;
399+
}
400+
401+
public Builder maxBackoff(long maxBackoffMillis) {
402+
this.maxBackoff = requireNonNegative(maxBackoffMillis, "maxBackoffMillis");
403+
return this;
404+
}
405+
406+
public Builder backoffMultiplier(int backoffMultiplier) {
407+
this.backoffMultiplier = requireNonNegative(backoffMultiplier, "backoffMultiplier");
408+
return this;
409+
}
410+
411+
public RetryPolicy build() {
412+
if (initialBackoff > maxBackoff) {
413+
throw new IllegalArgumentException("initialBackoff exceeds maxBackoff");
414+
}
415+
return new RetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier);
416+
}
310417
}
311-
return seq;
312418
}
313419

314420
private static int requireValid(List<String> keyValues, String message) {
@@ -344,6 +450,20 @@ private static long requireNonNegative(long value, String message) {
344450
return value;
345451
}
346452

453+
private static int requireNonNegative(int value, String message) {
454+
if (value < 0) {
455+
throw new IllegalArgumentException(message + " must be non-negative");
456+
}
457+
return value;
458+
}
459+
460+
private static int requirePositive(int value, String message) {
461+
if (value <= 0) {
462+
throw new IllegalArgumentException(message + " must be positive");
463+
}
464+
return value;
465+
}
466+
347467
private static int requireValid(String[] keyValues, String message) {
348468
Objects.requireNonNull(keyValues, "keyValues");
349469
int length = keyValues.length;

0 commit comments

Comments
 (0)