hs-test: vpp connect proxy stress tests

Type: test

Change-Id: Ie0b4e2d5f6d8ac19e86599f5f9ecbb642c3027ea
Signed-off-by: Matus Fabian <matfabia@cisco.com>
diff --git a/extras/hs-test/infra/suite_vpp_proxy.go b/extras/hs-test/infra/suite_vpp_proxy.go
index 37d251a..252d01e 100644
--- a/extras/hs-test/infra/suite_vpp_proxy.go
+++ b/extras/hs-test/infra/suite_vpp_proxy.go
@@ -7,8 +7,10 @@
 
 import (
 	"fmt"
+	"net"
 	"reflect"
 	"runtime"
+	"strconv"
 	"strings"
 
 	. "github.com/onsi/ginkgo/v2"
@@ -70,7 +72,9 @@
 	s.HstSuite.SetupTest()
 
 	// VPP HTTP connect-proxy
-	vpp, err := s.Containers.VppProxy.newVppInstance(s.Containers.VppProxy.AllocatedCpus)
+	var memoryConfig Stanza
+	memoryConfig.NewStanza("memory").Append("main-heap-size 2G")
+	vpp, err := s.Containers.VppProxy.newVppInstance(s.Containers.VppProxy.AllocatedCpus, memoryConfig)
 	s.AssertNotNil(vpp, fmt.Sprint(err))
 
 	s.AssertNil(vpp.Start())
@@ -176,6 +180,37 @@
 	s.AssertNotContains(log, "Upgrade:")
 }
 
+func handleConn(conn net.Conn) {
+	defer conn.Close()
+	buf := make([]byte, 1500)
+	for {
+		n, err := conn.Read(buf)
+		if err != nil {
+			break
+		}
+		_, err = conn.Write(buf[:n])
+		if err != nil {
+			break
+		}
+	}
+}
+
+func (s *VppProxySuite) StartEchoServer() *net.TCPListener {
+	listener, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP(s.ServerAddr()), Port: int(s.ServerPort())})
+	s.AssertNil(err, fmt.Sprint(err))
+	go func() {
+		for {
+			conn, err := listener.Accept()
+			if err != nil {
+				continue
+			}
+			go handleConn(conn)
+		}
+	}()
+	s.Log("* started tcp echo server " + s.ServerAddr() + ":" + strconv.Itoa(int(s.ServerPort())))
+	return listener
+}
+
 var _ = Describe("VppProxySuite", Ordered, ContinueOnFailure, func() {
 	var s VppProxySuite
 	BeforeAll(func() {
diff --git a/extras/hs-test/proxy_test.go b/extras/hs-test/proxy_test.go
index 0d1aa3b..3afdc31 100644
--- a/extras/hs-test/proxy_test.go
+++ b/extras/hs-test/proxy_test.go
@@ -1,8 +1,18 @@
 package main
 
 import (
+	"bufio"
+	"bytes"
+	"errors"
 	"fmt"
+	"io"
+	"math/rand"
+	"net"
+	"net/http"
+	"os"
 	"strconv"
+	"sync"
+	"sync/atomic"
 	"time"
 
 	. "fd.io/hs-test/infra"
@@ -12,7 +22,8 @@
 func init() {
 	RegisterVppProxyTests(VppProxyHttpGetTcpTest, VppProxyHttpGetTlsTest, VppProxyHttpPutTcpTest, VppProxyHttpPutTlsTest,
 		VppConnectProxyGetTest, VppConnectProxyPutTest)
-	RegisterVppProxySoloTests(VppProxyHttpGetTcpMTTest, VppProxyHttpPutTcpMTTest, VppProxyTcpIperfMTTest, VppProxyUdpIperfMTTest)
+	RegisterVppProxySoloTests(VppProxyHttpGetTcpMTTest, VppProxyHttpPutTcpMTTest, VppProxyTcpIperfMTTest,
+		VppProxyUdpIperfMTTest, VppConnectProxyStressTest, VppConnectProxyStressMTTest)
 	RegisterVppUdpProxyTests(VppProxyUdpTest)
 	RegisterEnvoyProxyTests(EnvoyProxyHttpGetTcpTest, EnvoyProxyHttpPutTcpTest)
 	RegisterNginxProxyTests(NginxMirroringTest)
@@ -183,6 +194,148 @@
 	s.CurlUploadResourceViaTunnel(targetUri, proxyUri, CurlContainerTestFile)
 }
 
+func vppConnectProxyStressLoad(s *VppProxySuite, proxyPort string) {
+	var (
+		connectError, timeout, readError, writeError, invalidData, total atomic.Uint32
+		wg                                                               sync.WaitGroup
+	)
+	stop := make(chan struct{})
+	targetUri := fmt.Sprintf("%s:%d", s.ServerAddr(), s.ServerPort())
+	s.Log("Running 30s test @ " + targetUri)
+
+	for i := 0; i < 1000; i++ {
+		wg.Add(1)
+		go func() {
+			var tot, timed, re, we uint32
+			defer wg.Done()
+			defer func() {
+				total.Add(tot)
+				timeout.Add(timed)
+				readError.Add(re)
+				writeError.Add(we)
+			}()
+		connRestart:
+			conn, err := net.DialTimeout("tcp", s.VppProxyAddr()+":"+proxyPort, time.Second*10)
+			if err != nil {
+				connectError.Add(1)
+				return
+			}
+			defer conn.Close()
+
+			conn.SetDeadline(time.Now().Add(time.Second * 5))
+
+			var b bytes.Buffer
+			fmt.Fprintf(&b, "CONNECT %s HTTP/1.1\r\n", targetUri)
+			fmt.Fprintf(&b, "Host: %s\r\n", s.ServerAddr())
+			fmt.Fprintf(&b, "User-Agent: hs-test\r\n")
+			io.WriteString(&b, "\r\n")
+			_, err = conn.Write(b.Bytes())
+			if err != nil {
+				connectError.Add(1)
+				return
+			}
+			r := bufio.NewReader(conn)
+			resp, err := http.ReadResponse(r, nil)
+			if err != nil {
+				connectError.Add(1)
+				return
+			}
+			resp.Body.Close()
+			if resp.StatusCode != http.StatusOK {
+				connectError.Add(1)
+				return
+			}
+
+			req := make([]byte, 64)
+			rand.Read(req)
+
+			for {
+				select {
+				default:
+					conn.SetDeadline(time.Now().Add(time.Second * 5))
+					tot += 1
+					_, e := conn.Write(req)
+					if e != nil {
+						if errors.Is(e, os.ErrDeadlineExceeded) {
+							timed += 1
+						} else {
+							we += 1
+						}
+						continue
+					}
+					reply := make([]byte, 1024)
+					n, e := conn.Read(reply)
+					if e != nil {
+						if errors.Is(e, os.ErrDeadlineExceeded) {
+							timed += 1
+						} else {
+							re += 1
+						}
+						conn.Close()
+						goto connRestart
+					}
+					if bytes.Compare(req, reply[:n]) != 0 {
+						invalidData.Add(1)
+						conn.Close()
+						goto connRestart
+					}
+				case <-stop:
+					return
+				}
+			}
+
+		}()
+	}
+	for i := 0; i < 30; i++ {
+		GinkgoWriter.Print(".")
+		time.Sleep(time.Second)
+	}
+	GinkgoWriter.Print("\n")
+	close(stop) // tell clients to stop
+	wg.Wait()   // wait until clients finish
+	successRatio := (float64(total.Load()-(timeout.Load()+readError.Load()+writeError.Load()+invalidData.Load())) / float64(total.Load())) * 100.0
+	summary := fmt.Sprintf("1000 connections %d requests in 30s", total.Load())
+	report := fmt.Sprintf("Requests/sec: %d\n", total.Load()/30)
+	report += fmt.Sprintf("Errors: timeout %d, read %d, write %d, invalid data received %d, connection %d\n", timeout.Load(), readError.Load(), writeError.Load(), invalidData.Load(), connectError.Load())
+	report += fmt.Sprintf("Successes ratio: %.2f%%\n", successRatio)
+	AddReportEntry(summary, report)
+	s.AssertGreaterThan(successRatio, 90.0)
+}
+
+func VppConnectProxyStressTest(s *VppProxySuite) {
+	var proxyPort uint16 = 8080
+	remoteServerConn := s.StartEchoServer()
+	defer remoteServerConn.Close()
+
+	configureVppProxy(s, "http", proxyPort)
+
+	// no goVPP less noise
+	s.Containers.VppProxy.VppInstance.Disconnect()
+
+	vppConnectProxyStressLoad(s, strconv.Itoa(int(proxyPort)))
+}
+
+func VppConnectProxyStressMTTest(s *VppProxySuite) {
+	var proxyPort uint16 = 8080
+	remoteServerConn := s.StartEchoServer()
+	defer remoteServerConn.Close()
+
+	vppProxy := s.Containers.VppProxy.VppInstance
+	// tap interfaces are created on test setup with 1 rx-queue,
+	// need to recreate them with 2 + consistent-qp
+	s.AssertNil(vppProxy.DeleteTap(s.Interfaces.Server))
+	s.AssertNil(vppProxy.CreateTap(s.Interfaces.Server, 2, uint32(s.Interfaces.Server.Peer.Index), Consistent_qp))
+	s.AssertNil(vppProxy.DeleteTap(s.Interfaces.Client))
+	s.AssertNil(vppProxy.CreateTap(s.Interfaces.Client, 2, uint32(s.Interfaces.Client.Peer.Index), Consistent_qp))
+
+	configureVppProxy(s, "http", proxyPort)
+
+	// no goVPP less noise
+	vppProxy.Disconnect()
+
+	vppConnectProxyStressLoad(s, strconv.Itoa(int(proxyPort)))
+}
+
 func VppProxyUdpTest(s *VppUdpProxySuite) {
 	remoteServerConn := s.StartEchoServer()
 	defer remoteServerConn.Close()