1
1
'use strict'
2
2
3
3
const errCode = require ( 'err-code' )
4
-
5
- let createRabin
4
+ const Long = require ( 'long' )
5
+ const BufferList = require ( 'bl' )
6
+ let rabin
6
7
7
8
module . exports = async function * rabinChunker ( source , options ) {
8
- if ( ! createRabin ) {
9
+ if ( ! rabin ) {
9
10
try {
10
- createRabin = require ( 'rabin' )
11
-
12
- if ( typeof createRabin !== 'function' ) {
13
- throw errCode ( new Error ( `createRabin was not a function` ) , 'ERR_UNSUPPORTED' )
14
- }
15
- } catch ( err ) {
16
- throw errCode ( new Error ( `Rabin chunker not available, it may have failed to install or not be supported on this platform` ) , 'ERR_UNSUPPORTED' )
11
+ rabin = nativeRabin ( )
12
+ } catch ( _ ) {
13
+ // fallback to js implementation
14
+ rabin = jsRabin ( )
17
15
}
18
16
}
19
17
@@ -30,30 +28,216 @@ module.exports = async function * rabinChunker (source, options) {
30
28
}
31
29
32
30
const sizepow = Math . floor ( Math . log2 ( avg ) )
33
- const rabin = createRabin ( {
31
+
32
+ for await ( const chunk of rabin ( source , {
34
33
min : min ,
35
34
max : max ,
36
35
bits : sizepow ,
37
36
window : options . window ,
38
37
polynomial : options . polynomial
39
- } )
38
+ } ) ) {
39
+ yield chunk
40
+ }
41
+ }
42
+
43
+ const nativeRabin = ( ) => {
44
+ const createRabin = require ( 'rabin' )
45
+
46
+ if ( typeof rabin !== 'function' ) {
47
+ throw errCode ( new Error ( `rabin was not a function` ) , 'ERR_UNSUPPORTED' )
48
+ }
49
+
50
+ return async function * ( source , options ) {
51
+ const rabin = createRabin ( options )
52
+
53
+ // TODO: rewrite rabin using node streams v3
54
+ for await ( const chunk of source ) {
55
+ rabin . buffers . append ( chunk )
56
+ rabin . pending . push ( chunk )
57
+
58
+ const sizes = [ ]
59
+
60
+ rabin . rabin . fingerprint ( rabin . pending , sizes )
61
+ rabin . pending = [ ]
62
+
63
+ for ( let i = 0 ; i < sizes . length ; i ++ ) {
64
+ const size = sizes [ i ]
65
+ const buf = rabin . buffers . slice ( 0 , size )
66
+ rabin . buffers . consume ( size )
67
+
68
+ yield buf
69
+ }
70
+ }
71
+
72
+ if ( rabin . buffers . length ) {
73
+ yield rabin . buffers . slice ( 0 )
74
+ }
75
+ }
76
+ }
77
+
78
+ const jsRabin = ( ) => {
79
+ // see https://github.com./datproject/rabin/blob/c0378395dc0a125ab21ac176ec504f9995b34e62/src/rabin.cc
80
+ class Rabin {
81
+ constructor ( options ) {
82
+ this . window = new Array ( options . window || 64 ) . fill ( Long . fromInt ( 0 ) )
83
+ this . wpos = 0
84
+ this . count = 0
85
+ this . digest = Long . fromInt ( 0 )
86
+ this . chunkLength = 0
87
+ this . polynomial = options . polynomial
88
+ this . polynomialDegree = 53
89
+ this . polynomialShift = this . polynomialDegree - 8
90
+ this . averageBits = options . bits || 12
91
+ this . minSize = options . min || 8 * 1024
92
+ this . maxSize = options . max || 32 * 1024
93
+ this . mask = Long . fromInt ( 1 ) . shiftLeft ( this . averageBits ) . subtract ( 1 )
94
+ this . modTable = [ ]
95
+ this . outTable = [ ]
96
+
97
+ this . calculateTables ( )
98
+ }
99
+
100
+ calculateTables ( ) {
101
+ for ( let i = 0 ; i < 256 ; i ++ ) {
102
+ let hash = Long . fromInt ( 0 , true )
103
+
104
+ hash = this . appendByte ( hash , i )
105
+
106
+ for ( let j = 0 ; j < this . window . length - 1 ; j ++ ) {
107
+ hash = this . appendByte ( hash , 0 )
108
+ }
109
+
110
+ this . outTable [ i ] = hash
111
+ }
112
+
113
+ const k = this . deg ( this . polynomial )
114
+
115
+ for ( let i = 0 ; i < 256 ; i ++ ) {
116
+ const b = Long . fromInt ( i , true )
117
+
118
+ this . modTable [ i ] = b . shiftLeft ( k )
119
+ . modulo ( this . polynomial )
120
+ . or ( b . shiftLeft ( k ) )
121
+ }
122
+ }
123
+
124
+ deg ( p ) {
125
+ let mask = Long . fromString ( '0x8000000000000000' , true , 16 )
126
+
127
+ for ( let i = 0 ; i < 64 ; i ++ ) {
128
+ if ( mask . and ( p ) . greaterThan ( 0 ) ) {
129
+ return Long . fromInt ( 63 - i )
130
+ }
131
+
132
+ mask = mask . shiftRight ( 1 )
133
+ }
40
134
41
- // TODO: rewrite rabin using node streams v3
42
- for await ( const chunk of source ) {
43
- rabin . buffers . append ( chunk )
44
- rabin . pending . push ( chunk )
135
+ return Long . fromInt ( - 1 )
136
+ }
137
+
138
+ appendByte ( hash , b ) {
139
+ hash = hash . shiftLeft ( 8 )
140
+ hash = hash . or ( b )
141
+
142
+ return hash . modulo ( this . polynomial )
143
+ }
144
+
145
+ getFingerprints ( bufs ) {
146
+ const lengths = [ ]
147
+
148
+ for ( let i = 0 ; i < bufs . length ; i ++ ) {
149
+ let buf = bufs [ i ]
150
+
151
+ while ( true ) {
152
+ const remaining = this . nextChunk ( buf )
153
+
154
+ if ( remaining < 0 ) {
155
+ break
156
+ }
157
+
158
+ buf = buf . slice ( remaining )
159
+
160
+ lengths . push ( this . chunkLength )
161
+ }
162
+ }
163
+
164
+ return lengths
165
+ }
166
+
167
+ nextChunk ( buf ) {
168
+ for ( let i = 0 ; i < buf . length ; i ++ ) {
169
+ const val = Long . fromInt ( buf [ i ] )
170
+
171
+ this . slide ( val )
172
+
173
+ this . count ++
174
+
175
+ if ( ( this . count >= this . minSize && this . digest . and ( this . mask ) . equals ( 0 ) ) || this . count >= this . maxSize ) {
176
+ this . chunkLength = this . count
177
+
178
+ this . reset ( )
179
+
180
+ return i + 1
181
+ }
182
+ }
183
+
184
+ return - 1
185
+ }
186
+
187
+ slide ( value ) {
188
+ const out = this . window [ this . wpos ] . toInt ( ) & 255
189
+ this . window [ this . wpos ] = value
190
+ this . digest = this . digest . xor ( this . outTable [ out ] )
191
+ this . wpos = ( this . wpos + 1 ) % this . window . length
192
+
193
+ this . append ( value )
194
+ }
195
+
196
+ reset ( ) {
197
+ this . window = this . window . map ( ( ) => Long . fromInt ( 0 ) )
198
+ this . wpos = 0
199
+ this . count = 0
200
+ this . digest = Long . fromInt ( 0 )
45
201
46
- const sizes = [ ]
202
+ this . slide ( Long . fromInt ( 1 ) )
203
+ }
47
204
48
- rabin . rabin . fingerprint ( rabin . pending , sizes )
49
- rabin . pending = [ ]
205
+ append ( value ) {
206
+ const index = this . digest . shiftRight ( this . polynomialShift ) . toInt ( ) & 255
207
+ this . digest = this . digest . shiftLeft ( 8 )
208
+ this . digest = this . digest . or ( value )
50
209
51
- for ( let i = 0 ; i < sizes . length ; i ++ ) {
52
- const size = sizes [ i ]
53
- const buf = rabin . buffers . slice ( 0 , size )
54
- rabin . buffers . consume ( size )
210
+ const entry = this . modTable [ index ]
211
+
212
+ if ( entry ) {
213
+ this . digest = this . digest . xor ( entry )
214
+ }
215
+ }
216
+ }
217
+
218
+ return async function * ( source , options ) {
219
+ const r = new Rabin ( options )
220
+ const buffers = new BufferList ( )
221
+ let pending = [ ]
222
+
223
+ for await ( const chunk of source ) {
224
+ buffers . append ( chunk )
225
+ pending . push ( chunk )
226
+
227
+ const sizes = r . getFingerprints ( pending )
228
+ pending = [ ]
229
+
230
+ for ( let i = 0 ; i < sizes . length ; i ++ ) {
231
+ var size = sizes [ i ]
232
+ var buf = buffers . slice ( 0 , size )
233
+ buffers . consume ( size )
234
+
235
+ yield buf
236
+ }
237
+ }
55
238
56
- yield buf
239
+ if ( buffers . length ) {
240
+ yield buffers . slice ( 0 )
57
241
}
58
242
}
59
243
}
0 commit comments