@@ -5,7 +5,6 @@ const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR
5
5
const PeerId = require ( 'peer-id' )
6
6
const pull = require ( 'pull-stream' )
7
7
const Pushable = require ( 'pull-pushable' )
8
- const waterfall = require ( 'async/waterfall' )
9
8
10
9
const log = debug ( 'jsipfs:pingPullStream' )
11
10
log . error = debug ( 'jsipfs:pingPullStream:error' )
@@ -20,15 +19,20 @@ module.exports = function pingPullStream (self) {
20
19
21
20
const source = Pushable ( )
22
21
23
- waterfall ( [
24
- ( cb ) => getPeer ( self . _libp2pNode , source , peerId , cb ) ,
25
- ( peer , cb ) => runPing ( self . _libp2pNode , source , opts . count , peer , cb )
26
- ] , ( err ) => {
22
+ getPeer ( self . _libp2pNode , source , peerId , ( err , peer ) => {
27
23
if ( err ) {
28
24
log . error ( err )
29
- source . push ( getPacket ( { success : false , text : err . toString ( ) } ) )
30
25
source . end ( err )
26
+ return
31
27
}
28
+
29
+ runPing ( self . _libp2pNode , source , opts . count , peer , ( err ) => {
30
+ if ( err ) {
31
+ log . error ( err )
32
+ source . push ( getPacket ( { success : false , text : err . toString ( ) } ) )
33
+ source . end ( )
34
+ }
35
+ } )
32
36
} )
33
37
34
38
return source
@@ -41,45 +45,40 @@ function getPacket (msg) {
41
45
return Object . assign ( basePacket , msg )
42
46
}
43
47
44
- function getPeer ( libp2pNode , statusStream , peerId , cb ) {
45
- let peer
48
+ function getPeer ( libp2pNode , statusStream , peerIdStr , cb ) {
49
+ let peerId
50
+
51
+ try {
52
+ peerId = PeerId . createFromB58String ( peerIdStr )
53
+ } catch ( err ) {
54
+ return cb ( err )
55
+ }
56
+
57
+ let peerInfo
46
58
47
59
try {
48
- peer = libp2pNode . peerBook . get ( peerId )
60
+ peerInfo = libp2pNode . peerBook . get ( peerId )
49
61
} catch ( err ) {
50
62
log ( 'Peer not found in peer book, trying peer routing' )
51
- // Share lookup status just as in the go implemmentation
52
- statusStream . push ( getPacket ( { text : `Looking up peer ${ peerId } ` } ) )
53
-
54
- // Try to use peerRouting
55
- try {
56
- peerId = PeerId . createFromB58String ( peerId )
57
- } catch ( err ) {
58
- return cb ( Object . assign ( err , {
59
- message : `failed to parse peer address '${ peerId } ': input isn't valid multihash`
60
- } ) )
61
- }
62
63
64
+ // Share lookup status just as in the go implemmentation
65
+ statusStream . push ( getPacket ( { text : `Looking up peer ${ peerIdStr } ` } ) )
63
66
return libp2pNode . peerRouting . findPeer ( peerId , cb )
64
67
}
65
68
66
- cb ( null , peer )
69
+ cb ( null , peerInfo )
67
70
}
68
71
69
72
function runPing ( libp2pNode , statusStream , count , peer , cb ) {
70
73
libp2pNode . ping ( peer , ( err , p ) => {
71
- if ( err ) {
72
- return cb ( err )
73
- }
74
-
75
- log ( 'Got peer' , peer )
74
+ if ( err ) { return cb ( err ) }
76
75
77
76
let packetCount = 0
78
77
let totalTime = 0
79
78
statusStream . push ( getPacket ( { text : `PING ${ peer . id . toB58String ( ) } ` } ) )
80
79
81
80
p . on ( 'ping' , ( time ) => {
82
- statusStream . push ( getPacket ( { time : time } ) )
81
+ statusStream . push ( getPacket ( { time } ) )
83
82
totalTime += time
84
83
packetCount ++
85
84
if ( packetCount >= count ) {
@@ -93,12 +92,9 @@ function runPing (libp2pNode, statusStream, count, peer, cb) {
93
92
p . on ( 'error' , ( err ) => {
94
93
log . error ( err )
95
94
p . stop ( )
96
- statusStream . push ( getPacket ( { success : false , text : err . toString ( ) } ) )
97
- statusStream . end ( err )
95
+ cb ( err )
98
96
} )
99
97
100
98
p . start ( )
101
-
102
- return cb ( )
103
99
} )
104
100
}
0 commit comments