@@ -8,14 +8,21 @@ const dirtyChai = require('dirty-chai')
8
8
const expect = chai . expect
9
9
chai . use ( dirtyChai )
10
10
11
+ const base64url = require ( 'base64url' )
12
+ const { fromB58String } = require ( 'multihashes' )
11
13
const parallel = require ( 'async/parallel' )
14
+ const retry = require ( 'async/retry' )
15
+ const series = require ( 'async/series' )
12
16
13
17
const isNode = require ( 'detect-node' )
18
+ const ipns = require ( 'ipns' )
14
19
const IPFS = require ( '../../src' )
20
+ const waitFor = require ( '../utils/wait-for' )
15
21
16
22
const DaemonFactory = require ( 'ipfsd-ctl' )
17
23
const df = DaemonFactory . create ( { type : 'proc' } )
18
24
25
+ const namespace = '/record/'
19
26
const ipfsRef = '/ipfs/QmPFVLPmp9zv5Z5KUqLhe2EivAGccQW2r7M7jhVJGLZoZU'
20
27
21
28
describe ( 'name-pubsub' , function ( ) {
@@ -76,19 +83,56 @@ describe('name-pubsub', function () {
76
83
it ( 'should publish and then resolve correctly' , function ( done ) {
77
84
this . timeout ( 80 * 1000 )
78
85
86
+ let subscribed = false
87
+
88
+ function checkMessage ( msg ) {
89
+ subscribed = true
90
+ }
91
+
92
+ const alreadySubscribed = ( cb ) => {
93
+ return cb ( null , subscribed === true )
94
+ }
95
+
96
+ // Wait until a peer subscribes a topic
97
+ const waitForPeerToSubscribe = ( node , topic , callback ) => {
98
+ retry ( {
99
+ times : 5 ,
100
+ interval : 2000
101
+ } , ( next ) => {
102
+ node . pubsub . peers ( topic , ( error , res ) => {
103
+ if ( error ) {
104
+ return next ( error )
105
+ }
106
+
107
+ if ( ! res || ! res . length ) {
108
+ return next ( new Error ( 'Could not find subscription' ) )
109
+ }
110
+
111
+ return next ( null , res )
112
+ } )
113
+ } , callback )
114
+ }
115
+
116
+ const keys = ipns . getIdKeys ( fromB58String ( idA . id ) )
117
+ const topic = `${ namespace } ${ base64url . encode ( keys . routingKey . toBuffer ( ) ) } `
118
+
79
119
nodeB . name . resolve ( idA . id , ( err ) => {
80
120
expect ( err ) . to . exist ( )
81
121
82
- nodeA . name . publish ( ipfsRef , { resolve : false } , ( err , res ) => {
122
+ series ( [
123
+ ( cb ) => waitForPeerToSubscribe ( nodeA , topic , cb ) ,
124
+ ( cb ) => nodeB . pubsub . subscribe ( topic , checkMessage , cb ) ,
125
+ ( cb ) => nodeA . name . publish ( ipfsRef , { resolve : false } , cb ) ,
126
+ ( cb ) => waitFor ( ( callback ) => alreadySubscribed ( callback ) , cb ) ,
127
+ ( cb ) => setTimeout ( ( ) => cb ( ) , 1000 ) , // guarantee record is written
128
+ ( cb ) => nodeB . name . resolve ( idA . id , cb )
129
+ ] , ( err , res ) => {
83
130
expect ( err ) . to . not . exist ( )
84
131
expect ( res ) . to . exist ( )
85
132
86
- nodeB . name . resolve ( idA . id , ( err , res ) => {
87
- expect ( err ) . to . not . exist ( )
88
- expect ( res ) . to . exist ( )
89
- expect ( res . path ) . to . equal ( ipfsRef )
90
- done ( )
91
- } )
133
+ expect ( res [ 5 ] ) . to . exist ( )
134
+ expect ( res [ 5 ] . path ) . to . equal ( ipfsRef )
135
+ done ( )
92
136
} )
93
137
} )
94
138
} )
0 commit comments