vapi: support services

Add missing support for

service { rpc X_get returns X_get_reply stream X_details; }

Type: improvement

Change-Id: I27555f61a2974e414cb6554f32c550b8ee5eb037
Signed-off-by: Stanislav Zaikin <stanislav.zaikin@46labs.com>
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
diff --git a/src/vpp-api/vapi/vapi.c b/src/vpp-api/vapi/vapi.c
index 7700eb0..45241e1 100644
--- a/src/vpp-api/vapi/vapi.c
+++ b/src/vpp-api/vapi/vapi.c
@@ -63,7 +63,8 @@
   u32 context;
   vapi_cb_t callback;
   void *callback_ctx;
-  bool is_dump;
+  vapi_msg_id_t response_id;
+  enum vapi_request_type type;
 } vapi_req_t;
 
 static const u32 context_counter_mask = (1 << 31);
@@ -137,15 +138,17 @@
 }
 
 void
-vapi_store_request (vapi_ctx_t ctx, u32 context, bool is_dump,
-		    vapi_cb_t callback, void *callback_ctx)
+vapi_store_request (vapi_ctx_t ctx, u32 context, vapi_msg_id_t response_id,
+		    enum vapi_request_type request_type, vapi_cb_t callback,
+		    void *callback_ctx)
 {
   assert (!vapi_requests_full (ctx));
   /* if the mutex is not held, bad things will happen */
   assert (0 != pthread_mutex_trylock (&ctx->requests_mutex));
   const int requests_end = vapi_requests_end (ctx);
   vapi_req_t *slot = &ctx->requests[requests_end];
-  slot->is_dump = is_dump;
+  slot->type = request_type;
+  slot->response_id = response_id;
   slot->context = context;
   slot->callback = callback;
   slot->callback_ctx = callback_ctx;
@@ -1116,8 +1119,34 @@
       int payload_offset = vapi_get_payload_offset (id);
       void *payload = ((u8 *) msg) + payload_offset;
       bool is_last = true;
-      if (ctx->requests[tmp].is_dump)
+      switch (ctx->requests[tmp].type)
 	{
+	case VAPI_REQUEST_STREAM:
+	  if (ctx->requests[tmp].response_id == id)
+	    {
+	      is_last = false;
+	    }
+	  else
+	    {
+	      VAPI_DBG ("Stream response ID doesn't match current ID, move to "
+			"next ID");
+	      clib_memset (&ctx->requests[tmp], 0,
+			   sizeof (ctx->requests[tmp]));
+	      ++ctx->requests_start;
+	      --ctx->requests_count;
+	      if (ctx->requests_start == ctx->requests_size)
+		{
+		  ctx->requests_start = 0;
+		}
+	      tmp = ctx->requests_start;
+	      if (ctx->requests[tmp].context != context)
+		{
+		  VAPI_ERR ("Unexpected context %u, expected context %u!",
+			    ctx->requests[tmp].context, context);
+		}
+	    }
+	  break;
+	case VAPI_REQUEST_DUMP:
 	  if (vapi_msg_id_control_ping_reply == id)
 	    {
 	      payload = NULL;
@@ -1126,6 +1155,9 @@
 	    {
 	      is_last = false;
 	    }
+	  break;
+	case VAPI_REQUEST_REG:
+	  break;
 	}
       if (payload_offset != -1)
 	{
diff --git a/src/vpp-api/vapi/vapi.hpp b/src/vpp-api/vapi/vapi.hpp
index a1e33a9..58d1706 100644
--- a/src/vpp-api/vapi/vapi.hpp
+++ b/src/vpp-api/vapi/vapi.hpp
@@ -140,6 +140,10 @@
 
   template <typename Req, typename Resp, typename... Args> friend class Dump;
 
+  template <typename Req, typename Resp, typename StreamMessage,
+	    typename... Args>
+  friend class Stream;
+
   template <typename M> friend class Event_registration;
 };
 
@@ -451,6 +455,10 @@
 
   template <typename Req, typename Resp, typename... Args> friend class Dump;
 
+  template <typename Req, typename Resp, typename StreamMessage,
+	    typename... Args>
+  friend class Stream;
+
   template <typename M> friend class Result_set;
 
   template <typename M> friend class Event_registration;
@@ -497,6 +505,10 @@
 
 template <typename Req, typename Resp, typename... Args> class Dump;
 
+template <typename Req, typename Resp, typename StreamMessage,
+	  typename... Args>
+class Stream;
+
 template <class, class = void> struct vapi_has_payload_trait : std::false_type
 {
 };
@@ -627,6 +639,10 @@
 
   template <typename Req, typename Resp, typename... Args> friend class Dump;
 
+  template <typename Req, typename Resp, typename StreamMessage,
+	    typename... Args>
+  friend class Stream;
+
   template <typename X> friend class Event_registration;
 
   template <typename X> friend class Result_set;
@@ -772,12 +788,96 @@
   bool complete;
   std::vector<Msg<M>, typename Msg<M>::Msg_allocator> set;
 
+  template <typename Req, typename Resp, typename StreamMessage,
+	    typename... Args>
+  friend class Stream;
+
   template <typename Req, typename Resp, typename... Args> friend class Dump;
 
   template <typename X> friend class Event_registration;
 };
 
 /**
+ * Class representing a RPC request - zero or more identical responses to a
+ * single request message with a response
+ */
+template <typename Req, typename Resp, typename StreamMessage,
+	  typename... Args>
+class Stream : public Common_req
+{
+public:
+  Stream (
+    Connection &con, Args... args,
+    std::function<vapi_error_e (Stream<Req, Resp, StreamMessage, Args...> &)>
+      cb = nullptr)
+      : Common_req{ con }, request{ con, vapi_alloc<Req> (con, args...) },
+	response{ con, nullptr }, result_set{ con }, callback{ cb }
+  {
+  }
+
+  Stream (const Stream &) = delete;
+
+  virtual ~Stream () {}
+
+  virtual std::tuple<vapi_error_e, bool>
+  assign_response (vapi_msg_id_t id, void *shm_data)
+  {
+    if (id == response.get_msg_id ())
+      {
+	response.assign_response (id, shm_data);
+	result_set.mark_complete ();
+	set_response_state (RESPONSE_READY);
+	if (nullptr != callback)
+	  {
+	    return std::make_pair (callback (*this), true);
+	  }
+	return std::make_pair (VAPI_OK, true);
+      }
+    else
+      {
+	result_set.assign_response (id, shm_data);
+      }
+    return std::make_pair (VAPI_OK, false);
+  }
+
+  vapi_error_e
+  execute ()
+  {
+    return con.send (this);
+  }
+
+  const Msg<Req> &
+  get_request (void)
+  {
+    return request;
+  }
+
+  const Msg<Resp> &
+  get_response (void)
+  {
+    return response;
+  }
+
+  using resp_type = typename Msg<StreamMessage>::shm_data_type;
+
+  const Result_set<StreamMessage> &
+  get_result_set (void) const
+  {
+    return result_set;
+  }
+
+private:
+  Msg<Req> request;
+  Msg<Resp> response;
+  Result_set<StreamMessage> result_set;
+  std::function<vapi_error_e (Stream<Req, Resp, StreamMessage, Args...> &)>
+    callback;
+
+  friend class Connection;
+  friend class Result_set<StreamMessage>;
+};
+
+/**
  * Class representing a dump request - zero or more identical responses to a
  * single request message
  */
diff --git a/src/vpp-api/vapi/vapi_c_gen.py b/src/vpp-api/vapi/vapi_c_gen.py
index 84d9aca..37f5ac1 100755
--- a/src/vpp-api/vapi/vapi_c_gen.py
+++ b/src/vpp-api/vapi/vapi_c_gen.py
@@ -477,7 +477,7 @@
                 {{
                   VAPI_ERR("Truncated '{self.name}' msg received, received %lu"
                     "bytes, expected %lu bytes.", buf_size,
-                    sizeof({self.get_calc_msg_size_func_name()}));
+                    {self.get_calc_msg_size_func_name()}(msg));
                   return -1;
                 }}
               return 0;
@@ -615,45 +615,66 @@
         return "vapi_%s" % self.name
 
     def get_op_func_decl(self):
-        if self.reply.has_payload():
-            return "vapi_error_e %s(%s)" % (
-                self.get_op_func_name(),
-                ",\n  ".join(
-                    [
-                        "struct vapi_ctx_s *ctx",
-                        "%s *msg" % self.get_c_name(),
-                        "vapi_error_e (*callback)(struct vapi_ctx_s *ctx",
-                        "                         void *callback_ctx",
-                        "                         vapi_error_e rv",
-                        "                         bool is_last",
-                        "                         %s *reply)"
-                        % self.reply.get_payload_struct_name(),
-                        "void *callback_ctx",
-                    ]
-                ),
-            )
-        else:
-            return "vapi_error_e %s(%s)" % (
-                self.get_op_func_name(),
-                ",\n  ".join(
-                    [
-                        "struct vapi_ctx_s *ctx",
-                        "%s *msg" % self.get_c_name(),
-                        "vapi_error_e (*callback)(struct vapi_ctx_s *ctx",
-                        "                         void *callback_ctx",
-                        "                         vapi_error_e rv",
-                        "                         bool is_last)",
-                        "void *callback_ctx",
-                    ]
-                ),
-            )
+        stream_param_lines = []
+        if self.has_stream_msg:
+            stream_param_lines = [
+                "vapi_error_e (*details_callback)(struct vapi_ctx_s *ctx",
+                "                                 void *callback_ctx",
+                "                                 vapi_error_e rv",
+                "                                 bool is_last",
+                "                                 %s *details)"
+                % self.stream_msg.get_payload_struct_name(),
+                "void *details_callback_ctx",
+            ]
+
+        return "vapi_error_e %s(%s)" % (
+            self.get_op_func_name(),
+            ",\n  ".join(
+                [
+                    "struct vapi_ctx_s *ctx",
+                    "%s *msg" % self.get_c_name(),
+                    "vapi_error_e (*reply_callback)(struct vapi_ctx_s *ctx",
+                    "                               void *callback_ctx",
+                    "                               vapi_error_e rv",
+                    "                               bool is_last",
+                    "                               %s *reply)"
+                    % self.reply.get_payload_struct_name(),
+                ]
+                + [
+                    "void *reply_callback_ctx",
+                ]
+                + stream_param_lines
+            ),
+        )
 
     def get_op_func_def(self):
+        param_check_lines = ["  if (!msg || !reply_callback) {"]
+        store_request_lines = [
+            "    vapi_store_request(ctx, req_context, %s, %s, "
+            % (
+                self.reply.get_msg_id_name(),
+                "VAPI_REQUEST_DUMP" if self.reply_is_stream else "VAPI_REQUEST_REG",
+            ),
+            "                       (vapi_cb_t)reply_callback, reply_callback_ctx);",
+        ]
+        if self.has_stream_msg:
+            param_check_lines = [
+                "  if (!msg || !reply_callback || !details_callback) {"
+            ]
+            store_request_lines = [
+                f"    vapi_store_request(ctx, req_context, {self.stream_msg.get_msg_id_name()}, VAPI_REQUEST_STREAM, ",
+                "                       (vapi_cb_t)details_callback, details_callback_ctx);",
+                f"    vapi_store_request(ctx, req_context, {self.reply.get_msg_id_name()}, VAPI_REQUEST_REG, ",
+                "                       (vapi_cb_t)reply_callback, reply_callback_ctx);",
+            ]
+
         return "\n".join(
             [
                 "%s" % self.get_op_func_decl(),
                 "{",
-                "  if (!msg || !callback) {",
+            ]
+            + param_check_lines
+            + [
                 "    return VAPI_EINVAL;",
                 "  }",
                 "  if (vapi_is_nonblocking(ctx) && vapi_requests_full(ctx)) {",
@@ -669,14 +690,12 @@
                 (
                     "  if (VAPI_OK == (rv = vapi_send_with_control_ping "
                     "(ctx, msg, req_context))) {"
-                    if self.reply_is_stream
+                    if (self.reply_is_stream and not self.has_stream_msg)
                     else "  if (VAPI_OK == (rv = vapi_send (ctx, msg))) {"
                 ),
-                (
-                    "    vapi_store_request(ctx, req_context, %s, "
-                    "(vapi_cb_t)callback, callback_ctx);"
-                    % ("true" if self.reply_is_stream else "false")
-                ),
+            ]
+            + store_request_lines
+            + [
                 "    if (VAPI_OK != vapi_producer_unlock (ctx)) {",
                 "      abort (); /* this really shouldn't happen */",
                 "    }",
@@ -792,6 +811,8 @@
             emit_definition(parser, json_file, emitted, x)
     if hasattr(o, "reply"):
         emit_definition(parser, json_file, emitted, o.reply)
+    if hasattr(o, "stream_msg"):
+        emit_definition(parser, json_file, emitted, o.stream_msg)
     if hasattr(o, "get_c_def"):
         if (
             o not in parser.enums_by_json[json_file]
@@ -820,14 +841,14 @@
             print("%s%s" % (function_attrs, o.get_calc_msg_size_func_def()))
             print("")
             print("%s%s" % (function_attrs, o.get_verify_msg_size_func_def()))
-            if not o.is_reply and not o.is_event:
+            if not o.is_reply and not o.is_event and not o.is_stream:
                 print("")
                 print("%s%s" % (function_attrs, o.get_alloc_func_def()))
                 print("")
                 print("%s%s" % (function_attrs, o.get_op_func_def()))
             print("")
             print("%s" % o.get_c_constructor())
-            if o.is_reply or o.is_event:
+            if (o.is_reply or o.is_event) and not o.is_stream:
                 print("")
                 print("%s%s;" % (function_attrs, o.get_event_cb_func_def()))
         elif hasattr(o, "get_swap_to_be_func_def"):
diff --git a/src/vpp-api/vapi/vapi_c_test.c b/src/vpp-api/vapi/vapi_c_test.c
index 99a93fb..5eccb0f 100644
--- a/src/vpp-api/vapi/vapi_c_test.c
+++ b/src/vpp-api/vapi/vapi_c_test.c
@@ -28,6 +28,7 @@
 #include <vapi/vlib.api.vapi.h>
 #include <vapi/vpe.api.vapi.h>
 #include <vapi/interface.api.vapi.h>
+#include <vapi/mss_clamp.api.vapi.h>
 #include <vapi/l2.api.vapi.h>
 #include <fake.api.vapi.h>
 
@@ -36,6 +37,7 @@
 
 DEFINE_VAPI_MSG_IDS_VPE_API_JSON;
 DEFINE_VAPI_MSG_IDS_INTERFACE_API_JSON;
+DEFINE_VAPI_MSG_IDS_MSS_CLAMP_API_JSON;
 DEFINE_VAPI_MSG_IDS_L2_API_JSON;
 DEFINE_VAPI_MSG_IDS_FAKE_API_JSON;
 
@@ -481,6 +483,48 @@
   return VAPI_OK;
 }
 
+vapi_error_e
+vapi_mss_clamp_enable_disable_reply_cb (
+  struct vapi_ctx_s *ctx, void *callback_ctx, vapi_error_e rv, bool is_last,
+  vapi_payload_mss_clamp_enable_disable_reply *reply)
+{
+  bool *x = callback_ctx;
+  *x = true;
+  return VAPI_OK;
+}
+
+vapi_error_e
+vapi_mss_clamp_get_reply_cb (struct vapi_ctx_s *ctx, void *callback_ctx,
+			     vapi_error_e rv, bool is_last,
+			     vapi_payload_mss_clamp_get_reply *reply)
+{
+  int *counter = callback_ctx;
+  ck_assert_int_gt (*counter, 0); // make sure details were called first
+  ++*counter;
+  ck_assert_int_eq (is_last, true);
+  printf ("Got mss clamp reply error %d\n", rv);
+  ck_assert_int_eq (rv, VAPI_OK);
+  printf ("counter is %d", *counter);
+  return VAPI_OK;
+}
+
+vapi_error_e
+vapi_mss_clamp_get_details_cb (struct vapi_ctx_s *ctx, void *callback_ctx,
+			       vapi_error_e rv, bool is_last,
+			       vapi_payload_mss_clamp_details *details)
+{
+  int *counter = callback_ctx;
+  ++*counter;
+  if (!is_last)
+    {
+      printf ("Got ipv4 mss clamp to %u for sw_if_index %u\n",
+	      details->ipv4_mss, details->sw_if_index);
+      ck_assert_int_eq (details->ipv4_mss, 1000 + details->sw_if_index);
+    }
+  printf ("counter is %d", *counter);
+  return VAPI_OK;
+}
+
 START_TEST (test_loopbacks_1)
 {
   printf ("--- Create/delete loopbacks using blocking API ---\n");
@@ -521,6 +565,37 @@
 	      mac_addresses[i][3], mac_addresses[i][4], mac_addresses[i][5],
 	      sw_if_indexes[i]);
     }
+
+  { // new context
+    for (int i = 0; i < num_ifs; ++i)
+      {
+	vapi_msg_mss_clamp_enable_disable *mc =
+	  vapi_alloc_mss_clamp_enable_disable (ctx);
+	mc->payload.sw_if_index = sw_if_indexes[i];
+	mc->payload.ipv4_mss = 1000 + sw_if_indexes[i];
+	mc->payload.ipv4_direction = MSS_CLAMP_DIR_RX;
+	bool reply_ctx = false;
+	printf ("Set ipv4 mss clamp to %u for sw_if_index %u\n",
+		mc->payload.ipv4_mss, mc->payload.sw_if_index);
+	vapi_error_e rv = vapi_mss_clamp_enable_disable (
+	  ctx, mc, vapi_mss_clamp_enable_disable_reply_cb, &reply_ctx);
+	ck_assert_int_eq (VAPI_OK, rv);
+	ck_assert_int_eq (reply_ctx, true);
+      }
+  }
+
+  { // new context
+    int counter = 0;
+    vapi_msg_mss_clamp_get *msg = vapi_alloc_mss_clamp_get (ctx);
+    msg->payload.sw_if_index = ~0;
+    vapi_error_e rv =
+      vapi_mss_clamp_get (ctx, msg, vapi_mss_clamp_get_reply_cb, &counter,
+			  vapi_mss_clamp_get_details_cb, &counter);
+    printf ("counter is %d", counter);
+    ck_assert_int_eq (VAPI_OK, rv);
+    ck_assert_int_eq (counter, num_ifs + 1);
+  }
+
   bool seen[num_ifs];
   sw_interface_dump_ctx dctx = { false, num_ifs, sw_if_indexes, seen, 0 };
   vapi_msg_sw_interface_dump *dump;
diff --git a/src/vpp-api/vapi/vapi_cpp_gen.py b/src/vpp-api/vapi/vapi_cpp_gen.py
index 33744a3..165730c 100755
--- a/src/vpp-api/vapi/vapi_cpp_gen.py
+++ b/src/vpp-api/vapi/vapi_cpp_gen.py
@@ -96,6 +96,13 @@
         return "%s%s" % (self.name[0].upper(), self.name[1:])
 
     def get_req_template_name(self):
+        if self.has_stream_msg:
+            return "Stream<%s, %s, %s>" % (
+                self.get_c_name(),
+                self.reply.get_c_name(),
+                self.stream_msg.get_c_name(),
+            )
+
         if self.reply_is_stream:
             template = "Dump"
         else:
@@ -196,7 +203,7 @@
             print("/* m.get_cpp_constructor() */")
         print("%s" % m.get_cpp_constructor())
         print("")
-        if not m.is_reply and not m.is_event:
+        if not m.is_reply and not m.is_event and not m.is_stream:
             if add_debug_comments:
                 print("/* m.get_alloc_template_instantiation() */")
             print("%s" % m.get_alloc_template_instantiation())
@@ -210,6 +217,8 @@
                 print("/* m.get_reply_type_alias() */")
             print("%s" % m.get_reply_type_alias())
             continue
+        if m.is_stream:
+            continue
         if add_debug_comments:
             print("/* m.get_req_template_instantiation() */")
         print("%s" % m.get_req_template_instantiation())
diff --git a/src/vpp-api/vapi/vapi_cpp_test.cpp b/src/vpp-api/vapi/vapi_cpp_test.cpp
index c0e0cdc..25df5b7 100644
--- a/src/vpp-api/vapi/vapi_cpp_test.cpp
+++ b/src/vpp-api/vapi/vapi_cpp_test.cpp
@@ -25,10 +25,12 @@
 #include <vapi/vapi.hpp>
 #include <vapi/vpe.api.vapi.hpp>
 #include <vapi/interface.api.vapi.hpp>
+#include <vapi/mss_clamp.api.vapi.hpp>
 #include <fake.api.vapi.hpp>
 
 DEFINE_VAPI_MSG_IDS_VPE_API_JSON;
 DEFINE_VAPI_MSG_IDS_INTERFACE_API_JSON;
+DEFINE_VAPI_MSG_IDS_MSS_CLAMP_API_JSON;
 DEFINE_VAPI_MSG_IDS_FAKE_API_JSON;
 
 static char *app_name = nullptr;
@@ -145,6 +147,51 @@
     }
 
   { // new context
+    for (int i = 0; i < num_ifs; ++i)
+      {
+	Mss_clamp_enable_disable d (con);
+	auto &req = d.get_request ().get_payload ();
+	req.sw_if_index = sw_if_indexes[i];
+	req.ipv4_mss = 1420;
+	req.ipv4_direction = vapi_enum_mss_clamp_dir::MSS_CLAMP_DIR_RX;
+	auto rv = d.execute ();
+	ck_assert_int_eq (VAPI_OK, rv);
+	WAIT_FOR_RESPONSE (d, rv);
+	ck_assert_int_eq (VAPI_OK, rv);
+      }
+  }
+
+  { // new context
+    bool seen[num_ifs] = { 0 };
+    Mss_clamp_get d (con);
+    d.get_request ().get_payload ().sw_if_index = ~0;
+    auto rv = d.execute ();
+    ck_assert_int_eq (VAPI_OK, rv);
+    WAIT_FOR_RESPONSE (d, rv);
+    ck_assert_int_eq (VAPI_OK, rv);
+    auto &rs = d.get_result_set ();
+    for (auto &r : rs)
+      {
+	auto &p = r.get_payload ();
+	ck_assert_int_eq (p.ipv4_mss, 1420);
+	printf ("tcp-clamp: sw_if_idx %u ip4-mss %d dir %d\n", p.sw_if_index,
+		p.ipv4_mss, p.ipv4_direction);
+	for (int i = 0; i < num_ifs; ++i)
+	  {
+	    if (sw_if_indexes[i] == p.sw_if_index)
+	      {
+		ck_assert_int_eq (0, seen[i]);
+		seen[i] = true;
+	      }
+	  }
+      }
+    for (int i = 0; i < num_ifs; ++i)
+      {
+	ck_assert_int_eq (1, seen[i]);
+      }
+  }
+
+  { // new context
     bool seen[num_ifs] = {0};
     Sw_interface_dump d (con);
     auto rv = d.execute ();
diff --git a/src/vpp-api/vapi/vapi_internal.h b/src/vpp-api/vapi/vapi_internal.h
index 49c0417..ca47dd1 100644
--- a/src/vpp-api/vapi/vapi_internal.h
+++ b/src/vpp-api/vapi/vapi_internal.h
@@ -118,8 +118,18 @@
 size_t vapi_get_request_count (vapi_ctx_t ctx);
 size_t vapi_get_max_request_count (vapi_ctx_t ctx);
 u32 vapi_gen_req_context (vapi_ctx_t ctx);
-void vapi_store_request (vapi_ctx_t ctx, u32 context, bool is_dump,
-			 vapi_cb_t callback, void *callback_ctx);
+
+enum vapi_request_type
+{
+  VAPI_REQUEST_REG = 0,
+  VAPI_REQUEST_DUMP = 1,
+  VAPI_REQUEST_STREAM = 2,
+};
+
+void vapi_store_request (vapi_ctx_t ctx, u32 context,
+			 vapi_msg_id_t response_id,
+			 enum vapi_request_type type, vapi_cb_t callback,
+			 void *callback_ctx);
 int vapi_get_payload_offset (vapi_msg_id_t id);
 void (*vapi_get_swap_to_host_func (vapi_msg_id_t id)) (void *payload);
 void (*vapi_get_swap_to_be_func (vapi_msg_id_t id)) (void *payload);
diff --git a/src/vpp-api/vapi/vapi_json_parser.py b/src/vpp-api/vapi/vapi_json_parser.py
index 4f29f95..00c234f 100644
--- a/src/vpp-api/vapi/vapi_json_parser.py
+++ b/src/vpp-api/vapi/vapi_json_parser.py
@@ -158,6 +158,7 @@
         self.header = None
         self.is_reply = json_parser.is_reply(self.name)
         self.is_event = json_parser.is_event(self.name)
+        self.is_stream = json_parser.is_stream(self.name)
         fields = []
         for header in get_msg_header_defs(
             struct_type_class, field_class, json_parser, logger
@@ -346,6 +347,7 @@
         self.types["string"] = simple_type_class("vl_api_string_t")
         self.replies = set()
         self.events = set()
+        self.streams = set()
         self.simple_type_class = simple_type_class
         self.enum_class = enum_class
         self.union_class = union_class
@@ -384,6 +386,8 @@
                 if "events" in self.services[k]:
                     for x in self.services[k]["events"]:
                         self.events.add(x)
+                if "stream_msg" in self.services[k]:
+                    self.streams.add(self.services[k]["stream_msg"])
             for e in j["enums"]:
                 name = e[0]
                 value_pairs = e[1:-1]
@@ -521,6 +525,20 @@
     def is_event(self, message):
         return message in self.events
 
+    def is_stream(self, message):
+        return message in self.streams
+
+    def has_stream_msg(self, message):
+        return (
+            message.name in self.services
+            and "stream_msg" in self.services[message.name]
+        )
+
+    def get_stream_msg(self, message):
+        if not self.has_stream_msg(message):
+            return None
+        return self.messages[self.services[message.name]["stream_msg"]]
+
     def get_reply(self, message):
         return self.messages[self.services[message]["reply"]]
 
@@ -532,13 +550,15 @@
             remove = []
             for n, m in j.items():
                 try:
-                    if not m.is_reply and not m.is_event:
+                    if not m.is_reply and not m.is_event and not m.is_stream:
                         try:
                             m.reply = self.get_reply(n)
+                            m.reply_is_stream = False
+                            m.has_stream_msg = self.has_stream_msg(m)
                             if "stream" in self.services[m.name]:
                                 m.reply_is_stream = self.services[m.name]["stream"]
-                            else:
-                                m.reply_is_stream = False
+                            if m.has_stream_msg:
+                                m.stream_msg = self.get_stream_msg(m)
                             m.reply.request = m
                         except:
                             raise ParseError("Cannot find reply to message `%s'" % n)