@@ -847,7 +847,11 @@ mod tests {
847
847
LocalListener :: new ( format ! ( "{}_{}" , self . muxer. host_sock_path, port) )
848
848
}
849
849
850
- fn local_connect ( & mut self , peer_port : u32 ) -> ( UnixStream , u32 ) {
850
+ fn local_connect_maybe_upgrade (
851
+ & mut self ,
852
+ peer_port : u32 ,
853
+ is_upgrade : bool ,
854
+ ) -> ( UnixStream , u32 ) {
851
855
let ( init_local_lsn_count, init_conn_lsn_count) = self . count_epoll_listeners ( ) ;
852
856
853
857
let mut stream = UnixStream :: connect ( self . muxer . host_sock_path . clone ( ) ) . unwrap ( ) ;
@@ -861,7 +865,11 @@ mod tests {
861
865
let ( local_lsn_count, _) = self . count_epoll_listeners ( ) ;
862
866
assert_eq ! ( local_lsn_count, init_local_lsn_count + 1 ) ;
863
867
864
- let buf = format ! ( "CONNECT {}\n " , peer_port) ;
868
+ let buf = if is_upgrade {
869
+ format ! ( "Upgrade {}\n " , peer_port)
870
+ } else {
871
+ format ! ( "CONNECT {}\n " , peer_port)
872
+ } ;
865
873
stream. write_all ( buf. as_bytes ( ) ) . unwrap ( ) ;
866
874
// The muxer would now get notified that data is available for reading from the locally
867
875
// initiated connection.
@@ -895,6 +903,14 @@ mod tests {
895
903
896
904
( stream, local_port)
897
905
}
906
+
907
+ fn local_connect ( & mut self , peer_port : u32 ) -> ( UnixStream , u32 ) {
908
+ self . local_connect_maybe_upgrade ( peer_port, false )
909
+ }
910
+
911
+ fn local_connect_with_upgrade ( & mut self , peer_port : u32 ) -> ( UnixStream , u32 ) {
912
+ self . local_connect_maybe_upgrade ( peer_port, true )
913
+ }
898
914
}
899
915
900
916
struct LocalListener {
@@ -1071,6 +1087,40 @@ mod tests {
1071
1087
assert_eq ! ( ctx. pkt. buf( ) . unwrap( ) [ ..data. len( ) ] , data) ;
1072
1088
}
1073
1089
1090
+ #[ test]
1091
+ fn test_local_connection_with_upgrade ( ) {
1092
+ let mut ctx = MuxerTestContext :: new ( "local_connection_with_upgrade" ) ;
1093
+ let peer_port = 1025 ;
1094
+ let ( mut stream, local_port) = ctx. local_connect_with_upgrade ( peer_port) ;
1095
+
1096
+ // Test the handshake
1097
+ let mut buf = vec ! [ 0 ; 4 ] ;
1098
+ stream. read_exact ( buf. as_mut_slice ( ) ) . unwrap ( ) ;
1099
+ let buf = String :: from_utf8 ( buf) . unwrap ( ) ;
1100
+ assert_eq ! ( buf, "101\n " . to_string( ) ) ;
1101
+
1102
+ // Test guest -> host data flow.
1103
+ let data = [ 1 , 2 , 3 , 4 ] ;
1104
+ ctx. init_data_pkt ( local_port, peer_port, & data) ;
1105
+ ctx. send ( ) ;
1106
+
1107
+ let mut buf = vec ! [ 0u8 ; data. len( ) ] ;
1108
+ stream. read_exact ( buf. as_mut_slice ( ) ) . unwrap ( ) ;
1109
+ assert_eq ! ( buf. as_slice( ) , & data) ;
1110
+
1111
+ // Test host -> guest data flow.
1112
+ let data = [ 5 , 6 , 7 , 8 ] ;
1113
+ stream. write_all ( & data) . unwrap ( ) ;
1114
+ ctx. notify_muxer ( ) ;
1115
+
1116
+ assert ! ( ctx. muxer. has_pending_rx( ) ) ;
1117
+ ctx. recv ( ) ;
1118
+ assert_eq ! ( ctx. pkt. op( ) , uapi:: VSOCK_OP_RW ) ;
1119
+ assert_eq ! ( ctx. pkt. src_port( ) , local_port) ;
1120
+ assert_eq ! ( ctx. pkt. dst_port( ) , peer_port) ;
1121
+ assert_eq ! ( ctx. pkt. buf( ) . unwrap( ) [ ..data. len( ) ] , data) ;
1122
+ }
1123
+
1074
1124
#[ test]
1075
1125
fn test_local_close ( ) {
1076
1126
let peer_port = 1025 ;
@@ -1104,6 +1154,46 @@ mod tests {
1104
1154
assert ! ( !ctx. muxer. local_port_set. contains( & local_port) ) ;
1105
1155
}
1106
1156
1157
+ #[ test]
1158
+ fn test_local_close_with_upgrade ( ) {
1159
+ let peer_port = 1025 ;
1160
+ let mut ctx = MuxerTestContext :: new ( "local_close_with_upgrade" ) ;
1161
+ let local_port;
1162
+ {
1163
+ let ( mut stream, local_port_) = ctx. local_connect_with_upgrade ( peer_port) ;
1164
+
1165
+ // Test the handshake
1166
+ let mut buf = vec ! [ 0 ; 4 ] ;
1167
+ stream. read_exact ( buf. as_mut_slice ( ) ) . unwrap ( ) ;
1168
+ let buf = String :: from_utf8 ( buf) . unwrap ( ) ;
1169
+ assert_eq ! ( buf, "101\n " . to_string( ) ) ;
1170
+
1171
+ local_port = local_port_;
1172
+ }
1173
+ // Local var `_stream` was now dropped, thus closing the local stream. After the muxer gets
1174
+ // notified via EPOLLIN, it should attempt to gracefully shutdown the connection, issuing a
1175
+ // VSOCK_OP_SHUTDOWN with both no-more-send and no-more-recv indications set.
1176
+ ctx. notify_muxer ( ) ;
1177
+ assert ! ( ctx. muxer. has_pending_rx( ) ) ;
1178
+ ctx. recv ( ) ;
1179
+ assert_eq ! ( ctx. pkt. op( ) , uapi:: VSOCK_OP_SHUTDOWN ) ;
1180
+ assert_ne ! ( ctx. pkt. flags( ) & uapi:: VSOCK_FLAGS_SHUTDOWN_SEND , 0 ) ;
1181
+ assert_ne ! ( ctx. pkt. flags( ) & uapi:: VSOCK_FLAGS_SHUTDOWN_RCV , 0 ) ;
1182
+ assert_eq ! ( ctx. pkt. src_port( ) , local_port) ;
1183
+ assert_eq ! ( ctx. pkt. dst_port( ) , peer_port) ;
1184
+
1185
+ // The connection should get removed (and its local port freed), after the peer replies
1186
+ // with an RST.
1187
+ ctx. init_pkt ( local_port, peer_port, uapi:: VSOCK_OP_RST ) ;
1188
+ ctx. send ( ) ;
1189
+ let key = ConnMapKey {
1190
+ local_port,
1191
+ peer_port,
1192
+ } ;
1193
+ assert ! ( !ctx. muxer. conn_map. contains_key( & key) ) ;
1194
+ assert ! ( !ctx. muxer. local_port_set. contains( & local_port) ) ;
1195
+ }
1196
+
1107
1197
#[ test]
1108
1198
fn test_peer_close ( ) {
1109
1199
let peer_port = 1025 ;
0 commit comments