@@ -7,100 +7,144 @@ module.exports = async function * trickleReduceToRoot (source, reduce, options)
7
7
}
8
8
9
9
async function trickleStream ( source , reduce , options ) {
10
- let root = {
11
- children : [ ]
12
- }
13
- let node = root
10
+ let root
11
+ let iteration = 0
14
12
let maxDepth = 1
15
- let currentDepth = 1
16
- let layerSize = 0
13
+ let subTree = root = new Root ( options . layerRepeat )
17
14
18
15
for await ( const layer of batch ( source , options . maxChildrenPerNode ) ) {
19
- node . data = layer
20
-
21
- let parent = node . parent || root
22
- const nextNode = {
23
- children : [ ]
24
- }
16
+ if ( subTree . isFull ( ) ) {
17
+ if ( subTree !== root ) {
18
+ root . addChild ( await subTree . reduce ( reduce ) )
19
+ }
25
20
26
- if ( currentDepth < maxDepth ) {
27
- // the current layer can't have more children
28
- // but we can descend a layer
29
- node . children . push ( nextNode )
30
- nextNode . parent = node
31
- node = nextNode
32
- currentDepth ++
33
- } else if ( parent . children . length < options . layerRepeat ) {
34
- // the current layer can have more children
35
- parent . children . push ( nextNode )
36
- nextNode . parent = parent
37
- node = nextNode
38
- } else if ( currentDepth === maxDepth ) {
39
- // hit the bottom of the current iteration, can we find a sibling?
40
- parent = findNext ( root , 0 , maxDepth , options )
41
-
42
- if ( parent ) {
43
- nextNode . parent = parent
44
- parent . children . push ( nextNode )
45
- node = nextNode
46
- } else {
47
- if ( layerSize === 0 ) {
48
- maxDepth ++
49
- }
50
-
51
- layerSize ++
52
-
53
- if ( layerSize === options . layerRepeat ) {
54
- layerSize = 0
55
- }
56
-
57
- nextNode . parent = root
58
- root . children . push ( nextNode )
59
- node = nextNode
60
-
61
- currentDepth = 1
21
+ if ( iteration && iteration % options . layerRepeat === 0 ) {
22
+ maxDepth ++
62
23
}
24
+
25
+ subTree = new SubTree ( maxDepth , options . layerRepeat , iteration )
26
+
27
+ iteration ++
63
28
}
29
+
30
+ subTree . append ( layer )
64
31
}
65
32
66
- // reduce to root
67
- return walk ( root , reduce )
33
+ if ( subTree && subTree !== root ) {
34
+ root . addChild ( await subTree . reduce ( reduce ) )
35
+ }
36
+
37
+ return root . reduce ( reduce )
68
38
}
69
39
70
- const walk = async ( node , reduce ) => {
71
- let children = [ ]
40
+ class SubTree {
41
+ constructor ( maxDepth , layerRepeat , iteration ) {
42
+ this . maxDepth = maxDepth
43
+ this . layerRepeat = layerRepeat
44
+ this . currentDepth = 1
45
+ this . iteration = iteration
46
+
47
+ this . root = this . node = this . parent = {
48
+ children : [ ] ,
49
+ depth : this . currentDepth ,
50
+ maxDepth,
51
+ maxChildren : ( this . maxDepth - this . currentDepth ) * this . layerRepeat
52
+ }
53
+ }
54
+
55
+ isFull ( ) {
56
+ if ( ! this . root . data ) {
57
+ return false
58
+ }
59
+
60
+ if ( this . currentDepth < this . maxDepth && this . node . maxChildren ) {
61
+ // can descend
62
+ this . _addNextNodeToParent ( this . node )
63
+
64
+ return false
65
+ }
66
+
67
+ // try to find new node from node.parent
68
+ const distantRelative = this . _findParent ( this . node , this . currentDepth )
69
+
70
+ if ( distantRelative ) {
71
+ this . _addNextNodeToParent ( distantRelative )
72
+
73
+ return false
74
+ }
72
75
73
- if ( node . children . length ) {
74
- children = await Promise . all (
75
- node . children
76
- . filter ( child => child . data )
77
- . map ( child => walk ( child , reduce ) )
78
- )
76
+ return true
79
77
}
80
78
81
- return reduce ( node . data . concat ( children ) )
82
- }
79
+ _addNextNodeToParent ( parent ) {
80
+ this . parent = parent
81
+
82
+ // find site for new node
83
+ const nextNode = {
84
+ children : [ ] ,
85
+ depth : parent . depth + 1 ,
86
+ parent,
87
+ maxDepth : this . maxDepth ,
88
+ maxChildren : Math . floor ( parent . children . length / this . layerRepeat ) * this . layerRepeat
89
+ }
90
+
91
+ parent . children . push ( nextNode )
83
92
84
- const findNext = ( node , depth , maxDepth , options ) => {
85
- if ( depth === maxDepth ) {
86
- return
93
+ this . currentDepth = nextNode . depth
94
+ this . node = nextNode
87
95
}
88
96
89
- let nodeMatches = false
97
+ append ( layer ) {
98
+ this . node . data = layer
99
+ }
100
+
101
+ reduce ( reduce ) {
102
+ return this . _reduce ( this . root , reduce )
103
+ }
104
+
105
+ async _reduce ( node , reduce ) {
106
+ let children = [ ]
107
+
108
+ if ( node . children . length ) {
109
+ children = await Promise . all (
110
+ node . children
111
+ . filter ( child => child . data )
112
+ . map ( child => this . _reduce ( child , reduce ) )
113
+ )
114
+ }
90
115
91
- if ( node . children . length < options . layerRepeat ) {
92
- nodeMatches = true
116
+ return reduce ( node . data . concat ( children ) )
93
117
}
94
118
95
- if ( node . children . length ) {
96
- const childMatches = findNext ( node . children [ node . children . length - 1 ] , depth + 1 , maxDepth , options )
119
+ _findParent ( node , depth ) {
120
+ const parent = node . parent
121
+
122
+ if ( ! parent || parent . depth === 0 ) {
123
+ return
124
+ }
97
125
98
- if ( childMatches ) {
99
- return childMatches
126
+ if ( parent . children . length === parent . maxChildren || ! parent . maxChildren ) {
127
+ // this layer is full, may be able to traverse to a different branch
128
+ return this . _findParent ( parent , depth )
100
129
}
130
+
131
+ return parent
132
+ }
133
+ }
134
+
135
+ class Root extends SubTree {
136
+ constructor ( layerRepeat ) {
137
+ super ( 0 , layerRepeat )
138
+
139
+ this . root . depth = 0
140
+ this . currentDepth = 1
141
+ }
142
+
143
+ addChild ( child ) {
144
+ this . root . children . push ( child )
101
145
}
102
146
103
- if ( nodeMatches ) {
104
- return node
147
+ reduce ( reduce ) {
148
+ return reduce ( this . root . data . concat ( this . root . children ) )
105
149
}
106
150
}
0 commit comments