-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconfig_backup_xdr.go
201 lines (171 loc) · 6.35 KB
/
config_backup_xdr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
// Copyright 2024 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package backup
import (
"crypto/tls"
"fmt"
"regexp"
"strconv"
"time"
a "github.com./aerospike/aerospike-client-go/v8"
"github.com./aerospike/backup-go/models"
)
var expDCName = regexp.MustCompile(`^[a-zA-Z0-9_\-$]+$`)
// ConfigBackupXDR contains configuration for the xdr backup operation.
type ConfigBackupXDR struct {
// InfoPolicy applies to Aerospike Info requests made during backup and
// restore. If nil, the Aerospike client's default policy will be used.
InfoPolicy *a.InfoPolicy
// Encryption details.
EncryptionPolicy *EncryptionPolicy
// Compression details.
CompressionPolicy *CompressionPolicy
// Secret agent config.
SecretAgentConfig *SecretAgentConfig
// EncoderType describes an Encoder type that will be used on backing up.
// For XDR must be set to `EncoderTypeASBX` = 1.
EncoderType EncoderType
// File size limit (in bytes) for the backup. If a backup file exceeds this
// size threshold, a new file will be created. 0 for no file size limit.
FileLimit uint64
// ParallelWrite is the number of concurrent backup files writing.
ParallelWrite int
// DC name of dc that will be created on source instance.
DC string
// Local address, where source cluster will send data.
LocalAddress string
// Local port, where source cluster will send data.
LocalPort int
// Namespace is the Aerospike namespace to back up.
Namespace string
// Rewind is used to ship all existing records of a namespace.
// When rewinding a namespace, XDR will scan through the index and ship
// all the records for that namespace, partition by partition.
// Can be `all` or number of seconds.
Rewind string
// MaxThroughput number of records per second to ship using XDR. Must be in increments of 100.
// If 0, the default server value will be used.
MaxThroughput int
// TLS config for secure XDR connection.
TLSConfig *tls.Config
// Timeout for TCP read operations.
// Used by TCP server for XDR.
ReadTimeout time.Duration
// Timeout for TCP writes operations.
// Used by TCP server for XDR.
WriteTimeout time.Duration
// Results queue size.
// Used by TCP server for XDR.
ResultQueueSize int
// Ack messages queue size.
// Used by TCP server for XDR.
AckQueueSize int
// Max number of allowed simultaneous connection to server.
// Used by TCP server for XDR.
MaxConnections int
// How often a backup client will send info commands to check aerospike cluster stats.
// To measure recovery state and lag.
InfoPolingPeriod time.Duration
// Timeout for starting TCP server for XDR.
// If the TCP server for XDR does not receive any data within this timeout period, it will shut down.
// This situation can occur if the LocalAddress and LocalPort options are misconfigured.
StartTimeout time.Duration
// Retry policy for info commands.
InfoRetryPolicy *models.RetryPolicy
// By default XDR writes that originated from another XDR are not forwarded to the specified destination
// datacenters. Setting this parameter to true sends writes that originated from another XDR to the specified
// destination datacenters.
Forward bool
// MetricsEnabled indicates whether backup metrics collection and reporting are enabled.
MetricsEnabled bool
}
// validate validates the ConfigBackupXDR.
//
//nolint:gocyclo // contains a long list of validations
func (c *ConfigBackupXDR) validate() error {
if err := validateRewind(c.Rewind); err != nil {
return err
}
if c.DC == "" {
return fmt.Errorf("dc name must not be empty")
}
if len(c.DC) > 31 {
return fmt.Errorf("dc name must be less than 32 characters")
}
if !expDCName.MatchString(c.DC) {
return fmt.Errorf("dc name must match ^[a-zA-Z0-9_\\-$]+$")
}
if c.ParallelWrite < MinParallel || c.ParallelWrite > MaxParallel {
return fmt.Errorf("parallel write must be between 1 and 1024, got %d", c.ParallelWrite)
}
if c.LocalAddress == "" {
return fmt.Errorf("local address must not be empty")
}
if c.LocalPort < 0 || c.LocalPort > 65535 {
return fmt.Errorf("local port must be between 0 and 65535, got %d", c.LocalPort)
}
if c.Namespace == "" {
return fmt.Errorf("namespace must not be empty")
}
if c.ReadTimeout < 0 {
return fmt.Errorf("read timeout must not be negative, got %d", c.ReadTimeout)
}
if c.WriteTimeout < 0 {
return fmt.Errorf("write timeout must not be negative, got %d", c.WriteTimeout)
}
if c.ResultQueueSize < 0 {
return fmt.Errorf("result queue size must not be negative, got %d", c.ResultQueueSize)
}
if c.AckQueueSize < 0 {
return fmt.Errorf("ack queue size must not be negative, got %d", c.AckQueueSize)
}
if c.MaxConnections < 1 {
return fmt.Errorf("max connections must not be less than 1, got %d", c.MaxConnections)
}
if c.InfoPolingPeriod < 1 {
return fmt.Errorf("info poling period must not be less than 1, got %d", c.InfoPolingPeriod)
}
if err := c.CompressionPolicy.validate(); err != nil {
return fmt.Errorf("compression policy invalid: %w", err)
}
if err := c.EncryptionPolicy.validate(); err != nil {
return fmt.Errorf("encryption policy invalid: %w", err)
}
if err := c.SecretAgentConfig.validate(); err != nil {
return fmt.Errorf("secret agent invalid: %w", err)
}
if err := c.InfoRetryPolicy.Validate(); err != nil {
return fmt.Errorf("invalid info retry policy: %w", err)
}
if c.EncoderType != EncoderTypeASBX {
return fmt.Errorf("unsuported encoder type: %d", c.EncoderType)
}
if c.MaxThroughput%100 != 0 {
return fmt.Errorf("max throughput must be a multiple of 100, got %d", c.MaxThroughput)
}
if c.MaxThroughput < 0 {
return fmt.Errorf("max throughput must not be negative, got %d", c.MaxThroughput)
}
return nil
}
func validateRewind(value string) error {
if value == "all" {
return nil
}
num, err := strconv.ParseUint(value, 10, 64)
if err != nil || num == 0 {
return fmt.Errorf("rewind must be a positive number or 'all', got: %s", value)
}
return nil
}