Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit 376ae40

Browse files
committed
feat: support adding async iterators
Adds a `ipfs._addAsyncIterator` method for adding async iterators and refactors all add methods to call this, as when the Great Async Iteator Migration is complete this will become the one, true method to add files to IPFS.
1 parent 502efaf commit 376ae40

20 files changed

+497
-713
lines changed

package.json

+6-4
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@
9898
"ipfs-block-service": "~0.15.2",
9999
"ipfs-http-client": "^34.0.0",
100100
"ipfs-http-response": "~0.3.1",
101-
"ipfs-mfs": "~0.12.0",
102-
"ipfs-multipart": "~0.1.1",
101+
"ipfs-mfs": "^0.12.2",
102+
"ipfs-multipart": "^0.2.0",
103103
"ipfs-repo": "~0.26.6",
104104
"ipfs-unixfs": "~0.1.16",
105105
"ipfs-unixfs-exporter": "~0.37.7",
@@ -119,6 +119,8 @@
119119
"is-pull-stream": "~0.0.0",
120120
"is-stream": "^2.0.0",
121121
"iso-url": "~0.4.6",
122+
"it-pipe": "^1.0.1",
123+
"it-to-stream": "^0.1.1",
122124
"just-safe-set": "^2.1.0",
123125
"kind-of": "^6.0.2",
124126
"libp2p": "~0.26.1",
@@ -142,7 +144,7 @@
142144
"merge-options": "^1.0.1",
143145
"mime-types": "^2.1.21",
144146
"mkdirp": "~0.5.1",
145-
"mortice": "^1.2.2",
147+
"mortice": "^2.0.0",
146148
"multiaddr": "^6.1.0",
147149
"multiaddr-to-uri": "^5.0.0",
148150
"multibase": "~0.6.0",
@@ -194,7 +196,7 @@
194196
"execa": "^2.0.4",
195197
"form-data": "^2.5.1",
196198
"hat": "0.0.3",
197-
"interface-ipfs-core": "^0.111.1",
199+
"interface-ipfs-core": "^0.113.0",
198200
"ipfsd-ctl": "~0.46.0",
199201
"libp2p-websocket-star": "~0.10.2",
200202
"ncp": "^2.0.0",

src/cli/commands/add.js

+40-52
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,19 @@
11
'use strict'
22

3-
const pull = require('pull-stream/pull')
4-
const through = require('pull-stream/throughs/through')
5-
const end = require('pull-stream/sinks/on-end')
63
const promisify = require('promisify-es6')
74
const getFolderSize = promisify(require('get-folder-size'))
85
const byteman = require('byteman')
96
const mh = require('multihashes')
107
const multibase = require('multibase')
11-
const toPull = require('stream-to-pull-stream')
128
const { createProgressBar } = require('../utils')
139
const { cidToString } = require('../../utils/cid')
14-
const globSource = require('../../utils/files/glob-source')
10+
const globSource = require('ipfs-utils/src/files/glob-source')
1511

1612
async function getTotalBytes (paths) {
1713
const sizes = await Promise.all(paths.map(p => getFolderSize(p)))
1814
return sizes.reduce((total, size) => total + size, 0)
1915
}
2016

21-
function addPipeline (source, addStream, options, log) {
22-
let finalHash
23-
24-
return new Promise((resolve, reject) => {
25-
pull(
26-
source,
27-
addStream,
28-
through((file) => {
29-
const cid = finalHash = cidToString(file.hash, { base: options.cidBase })
30-
31-
if (options.silent || options.quieter) {
32-
return
33-
}
34-
35-
let message = cid
36-
37-
if (!options.quiet) {
38-
// print the hash twice if we are piping from stdin
39-
message = `added ${cid} ${options.file ? file.path || '' : cid}`.trim()
40-
}
41-
42-
log(message)
43-
}),
44-
end((err) => {
45-
if (err) {
46-
// Tweak the error message and add more relevant infor for the CLI
47-
if (err.code === 'ERR_DIR_NON_RECURSIVE') {
48-
err.message = `'${err.path}' is a directory, use the '-r' flag to specify directories`
49-
}
50-
return reject(err)
51-
}
52-
53-
if (options.quieter) {
54-
log(finalHash)
55-
}
56-
57-
resolve()
58-
})
59-
)
60-
})
61-
}
62-
6317
module.exports = {
6418
command: 'add [file...]',
6519

@@ -199,17 +153,51 @@ module.exports = {
199153
}
200154

201155
const source = argv.file
202-
? globSource(...argv.file, { recursive: argv.recursive })
203-
: toPull.source(process.stdin) // Pipe directly to ipfs.add
156+
? globSource(argv.file, { recursive: argv.recursive })
157+
: process.stdin // Pipe directly to ipfs.add
204158

205-
const adder = ipfs.addPullStream(options)
159+
let finalHash
206160

207161
try {
208-
await addPipeline(source, adder, argv, log)
209-
} finally {
162+
for await (const file of ipfs._addAsyncIterator(source, options)) {
163+
if (argv.silent) {
164+
continue
165+
}
166+
167+
if (argv.quieter) {
168+
finalHash = file.hash
169+
continue
170+
}
171+
172+
const cid = cidToString(file.hash, { base: argv.cidBase })
173+
let message = cid
174+
175+
if (!argv.quiet) {
176+
// print the hash twice if we are piping from stdin
177+
message = `added ${cid} ${argv.file ? file.path || '' : cid}`.trim()
178+
}
179+
180+
log(message)
181+
}
182+
} catch (err) {
210183
if (bar) {
211184
bar.terminate()
212185
}
186+
187+
// Tweak the error message and add more relevant infor for the CLI
188+
if (err.code === 'ERR_DIR_NON_RECURSIVE') {
189+
err.message = `'${err.path}' is a directory, use the '-r' flag to specify directories`
190+
}
191+
192+
throw err
193+
}
194+
195+
if (bar) {
196+
bar.terminate()
197+
}
198+
199+
if (argv.quieter) {
200+
log(cidToString(finalHash, { base: argv.cidBase }))
213201
}
214202
})())
215203
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
'use strict'
2+
3+
const importer = require('ipfs-unixfs-importer')
4+
const normaliseAddInput = require('ipfs-utils/src/files/normalise-input')
5+
const { parseChunkerString } = require('./utils')
6+
const pipe = require('it-pipe')
7+
const log = require('debug')('ipfs:add')
8+
log.error = require('debug')('ipfs:add:error')
9+
10+
function noop () {}
11+
12+
module.exports = function (self) {
13+
// Internal add func that gets used by all add funcs
14+
return async function * addAsyncIterator (source, options) {
15+
options = options || {}
16+
17+
const chunkerOptions = parseChunkerString(options.chunker)
18+
19+
const opts = Object.assign({}, {
20+
shardSplitThreshold: self._options.EXPERIMENTAL.sharding
21+
? 1000
22+
: Infinity
23+
}, options, {
24+
chunker: chunkerOptions.chunker,
25+
chunkerOptions: chunkerOptions.chunkerOptions
26+
})
27+
28+
// CID v0 is for multihashes encoded with sha2-256
29+
if (opts.hashAlg && opts.cidVersion !== 1) {
30+
opts.cidVersion = 1
31+
}
32+
33+
let total = 0
34+
35+
const prog = opts.progress || noop
36+
const progress = (bytes) => {
37+
total += bytes
38+
prog(total)
39+
}
40+
41+
opts.progress = progress
42+
43+
const iterator = pipe(
44+
normaliseAddInput(source),
45+
doImport(self, opts),
46+
transformFile(self, opts),
47+
preloadFile(self, opts),
48+
pinFile(self, opts)
49+
)
50+
51+
const releaseLock = await self._gcLock.readLock()
52+
53+
try {
54+
yield * iterator
55+
} finally {
56+
releaseLock()
57+
}
58+
}
59+
}
60+
61+
function doImport (ipfs, opts) {
62+
return async function * (source) { // eslint-disable-line require-await
63+
yield * importer(source, ipfs._ipld, opts)
64+
}
65+
}
66+
67+
function transformFile (ipfs, opts) {
68+
return async function * (source) {
69+
for await (const file of source) {
70+
let cid = file.cid
71+
const hash = cid.toBaseEncodedString()
72+
let path = file.path ? file.path : hash
73+
74+
if (opts.wrapWithDirectory && !file.path) {
75+
path = ''
76+
}
77+
78+
if (opts.onlyHash) {
79+
yield {
80+
path,
81+
hash,
82+
size: file.unixfs.fileSize()
83+
}
84+
85+
return
86+
}
87+
88+
const node = await ipfs.object.get(file.cid, Object.assign({}, opts, { preload: false }))
89+
90+
if (opts.cidVersion === 1) {
91+
cid = cid.toV1()
92+
}
93+
94+
let size = node.size
95+
96+
if (Buffer.isBuffer(node)) {
97+
size = node.length
98+
}
99+
100+
yield {
101+
path,
102+
hash,
103+
size
104+
}
105+
}
106+
}
107+
}
108+
109+
function preloadFile (ipfs, opts) {
110+
return async function * (source) {
111+
for await (const file of source) {
112+
const isRootFile = !file.path || opts.wrapWithDirectory
113+
? file.path === ''
114+
: !file.path.includes('/')
115+
116+
const shouldPreload = isRootFile && !opts.onlyHash && opts.preload !== false
117+
118+
if (shouldPreload) {
119+
ipfs._preload(file.hash)
120+
}
121+
122+
yield file
123+
}
124+
}
125+
}
126+
127+
function pinFile (ipfs, opts) {
128+
return async function * (source) {
129+
for await (const file of source) {
130+
// Pin a file if it is the root dir of a recursive add or the single file
131+
// of a direct add.
132+
const pin = 'pin' in opts ? opts.pin : true
133+
const isRootDir = !file.path.includes('/')
134+
const shouldPin = pin && isRootDir && !opts.onlyHash && !opts.hashAlg
135+
136+
if (shouldPin) {
137+
// Note: addAsyncIterator() has already taken a GC lock, so tell
138+
// pin.add() not to take a (second) GC lock
139+
await ipfs.pin.add(file.hash, {
140+
preload: false,
141+
lock: false
142+
})
143+
}
144+
145+
yield file
146+
}
147+
}
148+
}

0 commit comments

Comments
 (0)