-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathstream_client.go
237 lines (210 loc) · 5.82 KB
/
stream_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
/**********************************************************\
| |
| hprose |
| |
| Official WebSite: http://www.hprose.com/ |
| http://www.hprose.org/ |
| |
\**********************************************************/
/**********************************************************\
* *
* hprose/stream_client.go *
* *
* hprose stream client for Go. *
* *
* LastModified: May 27, 2015 *
* Authors: Ma Bingyao <[email protected]> *
* Ore_Ash <[email protected]> *
* *
\**********************************************************/
package hprose
import (
"net"
"sync"
"time"
)
// StreamClient is base struct for TcpClient and UnixClient
type StreamClient struct {
*BaseClient
timeout interface{}
readBuffer interface{}
readTimeout interface{}
writeBuffer interface{}
writeTimeout interface{}
}
func newStreamClient(trans Transporter) (client *StreamClient) {
client = new(StreamClient)
client.BaseClient = NewBaseClient(trans)
return
}
type streamConnStatus int
const (
free = streamConnStatus(iota)
using
closing
)
// ConnEntry is the connection entry in connection pool
type ConnEntry interface {
Get() net.Conn
Set(conn net.Conn)
Close()
}
type streamConnEntry struct {
uri string
conn net.Conn
status streamConnStatus
lastUsedTime time.Time
}
// NewStreamConnEntry is the constructor for StreamConnEntry
func NewStreamConnEntry(uri string) ConnEntry {
entry := new(streamConnEntry)
entry.uri = uri
entry.status = using
entry.lastUsedTime = time.Now()
return entry
}
// Get the connection
func (connEntry *streamConnEntry) Get() net.Conn {
return connEntry.conn
}
// Set the connection
func (connEntry *streamConnEntry) Set(conn net.Conn) {
if conn != nil {
connEntry.conn = conn
}
}
// Close the connection
func (connEntry *streamConnEntry) Close() {
connEntry.status = closing
}
// ConnPool is the connection pool
type ConnPool interface {
Timeout() time.Duration
SetTimeout(d time.Duration)
Get(uri string) ConnEntry
Close(uri string)
Free(entry ConnEntry)
}
type streamConnPool struct {
sync.Mutex
pool []*streamConnEntry
timer *time.Ticker
timeout time.Duration
}
// NewStreamConnPool is the constructor for StreamConnPool
func NewStreamConnPool(num int) ConnPool {
pool := new(streamConnPool)
pool.pool = make([]*streamConnEntry, 0, num)
return pool
}
func freeConns(conns []net.Conn) {
for _, conn := range conns {
conn.Close()
}
}
// Timeout return the timeout of the connection in pool
func (connPool *streamConnPool) Timeout() time.Duration {
return connPool.timeout
}
// SetTimeout for connection in pool
func (connPool *streamConnPool) SetTimeout(d time.Duration) {
if connPool.timer != nil {
connPool.timer.Stop()
connPool.timer = nil
}
connPool.timeout = d
if d > 0 {
connPool.timer = time.NewTicker(d)
go connPool.closeTimeoutConns()
}
}
func (connPool *streamConnPool) closeTimeoutConns() {
for t := range connPool.timer.C {
func() {
connPool.Lock()
defer connPool.Unlock()
conns := make([]net.Conn, 0, len(connPool.pool))
for _, entry := range connPool.pool {
if entry.uri != "" &&
entry.status == free &&
entry.conn != nil &&
t.After(entry.lastUsedTime.Add(connPool.timeout)) {
conns = append(conns, entry.conn)
entry.conn = nil
entry.uri = ""
}
}
go freeConns(conns)
}()
}
}
// Get the StreamConnEntry in StreamConnPool
func (connPool *streamConnPool) Get(uri string) ConnEntry {
connPool.Lock()
defer connPool.Unlock()
for _, entry := range connPool.pool {
if entry.status == free {
if entry.uri == uri {
entry.status = using
return entry
} else if entry.uri == "" {
entry.status = using
entry.uri = uri
return entry
}
}
}
entry := NewStreamConnEntry(uri)
connPool.pool = append(connPool.pool, entry.(*streamConnEntry))
return entry
}
// Close the specify uri connections in StreamConnPool
func (connPool *streamConnPool) Close(uri string) {
connPool.Lock()
defer connPool.Unlock()
conns := make([]net.Conn, 0, len(connPool.pool))
for _, entry := range connPool.pool {
if entry.uri == uri {
if entry.status == free {
conns = append(conns, entry.conn)
entry.conn = nil
entry.uri = ""
} else {
entry.Close()
}
}
}
go freeConns(conns)
}
// Free the entry to pool
func (connPool *streamConnPool) Free(entry ConnEntry) {
if entry, ok := entry.(*streamConnEntry); ok {
if entry.status == closing {
if entry.conn != nil {
go entry.conn.Close()
entry.conn = nil
}
entry.uri = ""
}
entry.lastUsedTime = time.Now()
entry.status = free
} else {
panic("entry is not an instance of *StreamConnEntry")
}
}
// SetReadBuffer sets the size of the operating system's receive buffer associated with the connection.
func (client *StreamClient) SetReadBuffer(bytes int) {
client.readBuffer = bytes
}
// SetReadTimeout for stream client
func (client *StreamClient) SetReadTimeout(d time.Duration) {
client.readTimeout = d
}
// SetWriteBuffer sets the size of the operating system's transmit buffer associated with the connection.
func (client *StreamClient) SetWriteBuffer(bytes int) {
client.writeBuffer = bytes
}
// SetWriteTimeout for stream client
func (client *StreamClient) SetWriteTimeout(d time.Duration) {
client.writeTimeout = d
}