@@ -10,7 +10,8 @@ const DirSharded = require('ipfs-unixfs-importer/src/importer/dir-sharded')
10
10
const series = require ( 'async/series' )
11
11
const log = require ( 'debug' ) ( 'ipfs:mfs:core:utils:add-link' )
12
12
const UnixFS = require ( 'ipfs-unixfs' )
13
- const Bucket = require ( 'hamt-sharding' )
13
+ const Bucket = require ( 'hamt-sharding/src/bucket' )
14
+ const loadNode = require ( './load-node' )
14
15
15
16
const defaultOptions = {
16
17
parent : undefined ,
@@ -78,11 +79,29 @@ const addLink = (context, options, callback) => {
78
79
return convertToShardedDirectory ( context , options , callback )
79
80
}
80
81
81
- log ( ' Adding to regular directory' )
82
+ log ( ` Adding ${ options . name } to regular directory` )
82
83
83
84
addToDirectory ( context , options , callback )
84
85
}
85
86
87
+ const convertToShardedDirectory = ( context , options , callback ) => {
88
+ createShard ( context , options . parent . links . map ( link => ( {
89
+ name : link . name ,
90
+ size : link . size ,
91
+ multihash : link . cid . buffer
92
+ } ) ) . concat ( {
93
+ name : options . name ,
94
+ size : options . size ,
95
+ multihash : options . cid . buffer
96
+ } ) , { } , ( err , result ) => {
97
+ if ( ! err ) {
98
+ log ( 'Converted directory to sharded directory' , result . cid . toBaseEncodedString ( ) )
99
+ }
100
+
101
+ callback ( err , result )
102
+ } )
103
+ }
104
+
86
105
const addToDirectory = ( context , options , callback ) => {
87
106
waterfall ( [
88
107
( done ) => {
@@ -112,125 +131,116 @@ const addToDirectory = (context, options, callback) => {
112
131
] , callback )
113
132
}
114
133
115
- const addToShardedDirectory = async ( context , options , callback ) => {
116
- const bucket = new Bucket ( {
117
- hashFn : DirSharded . hashFn
118
- } )
119
- const position = await bucket . _findNewBucketAndPos ( options . name )
120
- const prefix = position . pos
134
+ const addToShardedDirectory = ( context , options , callback ) => {
135
+ return waterfall ( [
136
+ ( cb ) => recreateHamtLevel ( options . parent . links , cb ) ,
137
+ ( rootBucket , cb ) => findPosition ( options . name , rootBucket , ( err , position ) => cb ( err , { rootBucket, position } ) ) ,
138
+ ( { rootBucket, position } , cb ) => {
139
+ // the path to the root bucket
140
+ let path = [ {
141
+ position : position . pos ,
142
+ bucket : position . bucket
143
+ } ]
144
+ let currentBucket = position . bucket
145
+
146
+ while ( currentBucket !== rootBucket ) {
147
+ path . push ( {
148
+ bucket : currentBucket ,
149
+ position : currentBucket . _posAtParent
150
+ } )
151
+
152
+ currentBucket = currentBucket . _parent
153
+ }
154
+
155
+ cb ( null , {
156
+ rootBucket,
157
+ path
158
+ } )
159
+ } ,
160
+ ( { rootBucket, path } , cb ) => updateShard ( context , options . parent , rootBucket , path , {
161
+ name : options . name ,
162
+ cid : options . cid ,
163
+ size : options . size
164
+ } , options , ( err , results = { } ) => cb ( err , { rootBucket, node : results . node } ) ) ,
165
+ ( { rootBucket, node } , cb ) => updateHamtDirectory ( context , node . links , rootBucket , options , cb )
166
+ ] , callback )
167
+ }
168
+
169
+ const updateShard = ( context , parent , rootBucket , positions , child , options , callback ) => {
170
+ const {
171
+ bucket,
172
+ position
173
+ } = positions . pop ( )
174
+
175
+ const prefix = position
121
176
. toString ( '16' )
122
177
. toUpperCase ( )
123
178
. padStart ( 2 , '0' )
124
179
. substring ( 0 , 2 )
125
180
126
- const existingSubShard = options . parent . links
127
- . filter ( link => link . name === prefix )
128
- . pop ( )
129
-
130
- if ( existingSubShard ) {
131
- log ( `Descending into sub-shard ${ prefix } to add link ${ options . name } ` )
132
-
133
- return addLink ( context , {
134
- ...options ,
135
- parent : null ,
136
- parentCid : existingSubShard . cid
137
- } , ( err , { cid, node } ) => {
138
- if ( err ) {
139
- return callback ( err )
181
+ const link = parent . links
182
+ . find ( link => link . name . substring ( 0 , 2 ) === prefix && link . name !== `${ prefix } ${ child . name } ` )
183
+
184
+ return waterfall ( [
185
+ ( cb ) => {
186
+ if ( link && link . name . length > 2 ) {
187
+ log ( `Converting existing file ${ link . name } into sub-shard for ${ child . name } ` )
188
+
189
+ return waterfall ( [
190
+ ( done ) => createShard ( context , [ {
191
+ name : link . name . substring ( 2 ) ,
192
+ size : link . size ,
193
+ multihash : link . cid . buffer
194
+ } , {
195
+ name : child . name ,
196
+ size : child . size ,
197
+ multihash : child . cid . buffer
198
+ } ] , { } , done ) ,
199
+ ( { node : { links : [ shard ] } } , done ) => {
200
+ return context . ipld . get ( shard . cid , ( err , result ) => {
201
+ done ( err , { cid : shard . cid , node : result && result . value } )
202
+ } )
203
+ } ,
204
+ ( { cid, node } , cb ) => updateShardParent ( context , bucket , parent , link . name , node , cid , prefix , options , cb )
205
+ ] , cb )
140
206
}
141
207
142
- // make sure parent is updated with new sub-shard cid
143
- addToDirectory ( context , {
144
- ...options ,
145
- parent : options . parent ,
146
- parentCid : options . parentCid ,
147
- name : prefix ,
148
- size : node . size ,
149
- cid : cid
150
- } , callback )
151
- } )
152
- }
153
-
154
- const existingFile = options . parent . links
155
- . filter ( link => link . name . substring ( 2 ) === options . name )
156
- . pop ( )
157
-
158
- if ( existingFile ) {
159
- log ( `Updating file ${ existingFile . name } ` )
160
-
161
- return addToDirectory ( context , {
162
- ...options ,
163
- name : existingFile . name
164
- } , callback )
165
- }
166
-
167
- const existingUnshardedFile = options . parent . links
168
- . filter ( link => link . name . substring ( 0 , 2 ) === prefix )
169
- . pop ( )
170
-
171
- if ( existingUnshardedFile ) {
172
- log ( `Replacing file ${ existingUnshardedFile . name } with sub-shard` )
173
-
174
- return createShard ( context , [ {
175
- name : existingUnshardedFile . name . substring ( 2 ) ,
176
- size : existingUnshardedFile . size ,
177
- multihash : existingUnshardedFile . cid . buffer
178
- } , {
179
- name : options . name ,
180
- size : options . size ,
181
- multihash : options . cid . buffer
182
- } ] , {
183
- root : false
184
- } , ( err , result ) => {
185
- if ( err ) {
186
- return callback ( err )
208
+ if ( link && link . name . length === 2 ) {
209
+ log ( `Descending into sub-shard` , child . name )
210
+
211
+ return waterfall ( [
212
+ ( cb ) => loadNode ( context , link , cb ) ,
213
+ ( { node } , cb ) => {
214
+ Promise . all (
215
+ node . links . map ( link => {
216
+ if ( link . name . length === 2 ) {
217
+ // add a bucket for the subshard of this subshard
218
+ const pos = parseInt ( link . name , 16 )
219
+
220
+ bucket . _putObjectAt ( pos , new Bucket ( {
221
+ hashFn : DirSharded . hashFn
222
+ } , bucket , pos ) )
223
+
224
+ return Promise . resolve ( )
225
+ }
226
+
227
+ // add to the root and let changes cascade down
228
+ return rootBucket . put ( link . name . substring ( 2 ) , true )
229
+ } )
230
+ )
231
+ . then ( ( ) => cb ( null , { node } ) )
232
+ . catch ( error => cb ( error ) )
233
+ } ,
234
+ ( { node } , cb ) => updateShard ( context , node , bucket , positions , child , options , cb ) ,
235
+ ( { cid, node } , cb ) => updateShardParent ( context , bucket , parent , link . name , node , cid , prefix , options , cb )
236
+ ] , cb )
187
237
}
188
238
189
- const newShard = result . node . links [ 0 ]
190
-
191
- waterfall ( [
192
- ( done ) => DAGNode . rmLink ( options . parent , existingUnshardedFile . name , done ) ,
193
- ( parent , done ) => DAGNode . addLink ( parent , newShard , done ) ,
194
- ( parent , done ) => {
195
- // Persist the new parent DAGNode
196
- context . ipld . put ( parent , {
197
- version : options . cidVersion ,
198
- format : options . codec ,
199
- hashAlg : options . hashAlg ,
200
- hashOnly : ! options . flush
201
- } , ( error , cid ) => done ( error , {
202
- node : parent ,
203
- cid
204
- } ) )
205
- }
206
- ] , callback )
207
- } )
208
- }
209
-
210
- log ( `Appending ${ prefix + options . name } to shard` )
211
-
212
- return addToDirectory ( context , {
213
- ...options ,
214
- name : prefix + options . name
215
- } , callback )
216
- }
239
+ log ( `Adding or replacing file` , prefix + child . name )
217
240
218
- const convertToShardedDirectory = ( context , options , callback ) => {
219
- createShard ( context , options . parent . links . map ( link => ( {
220
- name : link . name ,
221
- size : link . size ,
222
- multihash : link . cid . buffer
223
- } ) ) . concat ( {
224
- name : options . name ,
225
- size : options . size ,
226
- multihash : options . cid . buffer
227
- } ) , { } , ( err , result ) => {
228
- if ( ! err ) {
229
- log ( 'Converted directory to sharded directory' , result . cid . toBaseEncodedString ( ) )
241
+ updateShardParent ( context , bucket , parent , prefix + child . name , child , child . cid , prefix + child . name , options , cb )
230
242
}
231
-
232
- callback ( err , result )
233
- } )
243
+ ] , callback )
234
244
}
235
245
236
246
const createShard = ( context , contents , options , callback ) => {
@@ -267,4 +277,84 @@ const createShard = (context, contents, options, callback) => {
267
277
)
268
278
}
269
279
280
+ const updateShardParent = ( context , bucket , parent , name , node , cid , prefix , options , callback ) => {
281
+ waterfall ( [
282
+ ( done ) => {
283
+ if ( name ) {
284
+ if ( name === prefix ) {
285
+ log ( `Updating link ${ name } in shard parent` )
286
+ } else {
287
+ log ( `Removing link ${ name } from shard parent, adding link ${ prefix } ` )
288
+ }
289
+
290
+ return DAGNode . rmLink ( parent , name , done )
291
+ }
292
+
293
+ log ( `Adding link ${ prefix } to shard parent` )
294
+ done ( null , parent )
295
+ } ,
296
+ ( parent , done ) => DAGNode . addLink ( parent , new DAGLink ( prefix , node . size , cid ) , done ) ,
297
+ ( parent , done ) => updateHamtDirectory ( context , parent . links , bucket , options , done )
298
+ ] , callback )
299
+ }
300
+
301
+ const updateHamtDirectory = ( context , links , bucket , options , callback ) => {
302
+ // update parent with new bit field
303
+ waterfall ( [
304
+ ( cb ) => {
305
+ const data = Buffer . from ( bucket . _children . bitField ( ) . reverse ( ) )
306
+ const dir = new UnixFS ( 'hamt-sharded-directory' , data )
307
+ dir . fanout = bucket . tableSize ( )
308
+ dir . hashType = DirSharded . hashFn . code
309
+
310
+ DAGNode . create ( dir . marshal ( ) , links , cb )
311
+ } ,
312
+ ( parent , done ) => {
313
+ // Persist the new parent DAGNode
314
+ context . ipld . put ( parent , {
315
+ version : options . cidVersion ,
316
+ format : options . codec ,
317
+ hashAlg : options . hashAlg ,
318
+ hashOnly : ! options . flush
319
+ } , ( error , cid ) => done ( error , {
320
+ node : parent ,
321
+ cid
322
+ } ) )
323
+ }
324
+ ] , callback )
325
+ }
326
+
327
+ const recreateHamtLevel = ( links , callback ) => {
328
+ // recreate this level of the HAMT
329
+ const bucket = new Bucket ( {
330
+ hashFn : DirSharded . hashFn
331
+ } )
332
+
333
+ Promise . all (
334
+ links . map ( link => {
335
+ if ( link . name . length === 2 ) {
336
+ const pos = parseInt ( link . name , 16 )
337
+
338
+ bucket . _putObjectAt ( pos , new Bucket ( {
339
+ hashFn : DirSharded . hashFn
340
+ } , bucket , pos ) )
341
+
342
+ return Promise . resolve ( )
343
+ }
344
+
345
+ return bucket . put ( link . name . substring ( 2 ) , true )
346
+ } )
347
+ )
348
+ . then ( ( ) => callback ( null , bucket ) )
349
+ . catch ( error => callback ( error ) )
350
+ }
351
+
352
+ const findPosition = async ( name , bucket , callback ) => {
353
+ const position = await bucket . _findNewBucketAndPos ( name )
354
+
355
+ await bucket . put ( name , true )
356
+
357
+ callback ( null , position )
358
+ }
359
+
270
360
module . exports = addLink
0 commit comments