// Writing a basic HTTP server is easy using the
// `net/http` package.
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"net/http"
	"os"
	"strconv"
	"sync/atomic"
	"time"

	"github.com/confluentinc/confluent-kafka-go/kafka"
	"github.com/enriquebris/goconcurrentqueue"
	"github.com/gorilla/mux"
)

// Note: consumer 'group' and consumer 'user' both uses hardcoded values specific to this interface
//    globalCounters      var holding the "global counters"
//      recieved          number of received messages from all topics                             (int)
//      sent              number of sent messages to all topics                                   (int)
//    topics              var holding all topic related info
//      <topic-name>      name of a topic (present after topic is created)
//        content-type    data type of the topic                                                  (string)
//        counters
//          recieved      number of received messages from the topic                              (int)
//          sent          number of sent messages to the topic                                    (int)
//        messages
//          send          messages waiting to be sent (set when sending is started)               (fifo)
//          received      received messages waiting to be fetched (set when reception is started) (fifo)

type counter struct {
	c uint64
}

func (c *counter) step() {
	atomic.AddUint64(&c.c, 1)
}

func (c counter) get() uint64 {
	return atomic.LoadUint64(&c.c)
}

type counters struct {
	received counter
	sent     counter
}

func newCounters() counters {
	return counters{
		received: counter{},
		sent:     counter{},
	}
}

type messages struct {
	send     *goconcurrentqueue.FIFO
	received *goconcurrentqueue.FIFO
}

func (m *messages) startSend() bool {
	if m.send == nil {
		m.send = goconcurrentqueue.NewFIFO()
		return true
	}
	return false
}

func (m *messages) stopSend() {
	m.send = nil
}

func (m *messages) addToSend(msg string) error {
	if m.send == nil {
		return fmt.Errorf("sending not started")
	}
	m.send.Lock()
	defer m.send.Unlock()
	return m.send.Enqueue(msg)
}

func (m *messages) getToSend() (interface{}, error) {
	if m.send == nil {
		return "", fmt.Errorf("sending not started")
	}
	m.send.Lock()
	defer m.send.Unlock()
	return m.send.Dequeue()
}

func (m *messages) startReceive() bool {
	if m.received == nil {
		m.received = goconcurrentqueue.NewFIFO()
		return true
	}
	return false
}

func (m *messages) stopReceive() {
	m.send = nil
}

type topic struct {
	contentType string
	counters    counters
	messages    messages
}

func newTopic(ct string) *topic {
	return &topic{
		contentType: ct,
		counters:    counters{},
		messages:    messages{},
	}
}

var globalCounters counters
var topics map[string]*topic = make(map[string]*topic)

var bootstrapserver = ""

func initApp() {
	bootstrapserver = os.Getenv("KAFKA_BOOTSTRAP_SERVER")
	if len(bootstrapserver) == 0 {
		fmt.Println("Fatal error: env var KAFKA_BOOTSTRAP_SERVER not set")
		fmt.Println("Exiting... ")
		os.Exit(1)
	}
	fmt.Println("Using KAFKA_BOOTSTRAP_SERVER=" + bootstrapserver)
}

//Helper function to get a created topic, if it exists
func getTopicFromRequest(w http.ResponseWriter, req *http.Request) (*topic, string, bool) {
	topicId := mux.Vars(req)["topic"]
	t, exist := topics[topicId]
	if exist == false {
		w.WriteHeader(http.StatusNotFound)
		fmt.Fprintf(w, "Topic %v does not exist", topicId)
		return nil, "", false
	}
	return t, topicId, true
}

// Alive check
// GET on /
func healthCheck(w http.ResponseWriter, req *http.Request) {
	fmt.Fprintf(w, "OK")
}

// Deep reset of this interface stub - no removal of msgs or topics in kafka
// POST on /reset
func allreset(w http.ResponseWriter, req *http.Request) {
	for _, v := range topics {
		v.messages.stopSend()
		v.messages.stopReceive()
	}
	time.Sleep(5 * time.Second) //Allow producers/consumers to shut down
	globalCounters = newCounters()
	topics = make(map[string]*topic)
	fmt.Fprintf(w, "OK")
}

// Get topics, return json array of strings of topics created by this interface stub
// Returns json array of strings, array is empty if no topics exist
// GET on /topics

func getTopics(w http.ResponseWriter, req *http.Request) {
	topicKeys := make([]string, 0, len(topics))
	fmt.Printf("len topics: %v\n", len(topics))
	for k := range topics {
		topicKeys = append(topicKeys, k)
	}
	var j, err = json.Marshal(topicKeys)
	if err != nil {
		w.WriteHeader(http.StatusInternalServerError)
		fmt.Fprintf(w, "Cannot convert list of topics to json, error details: %v", err)
		return
	} else {
		w.Header().Set("Content-Type", "application/json")
		w.WriteHeader(http.StatusOK)
		w.Write(j)
	}
}

func writeOkRepsonse(w http.ResponseWriter, httpStatus int, msg string) {
	w.WriteHeader(httpStatus)
	w.Header().Set("Content-Type", "text/plain")
	fmt.Fprintf(w, msg)
}

// Get a counter value
// GET /topics/counters/{counter}
func getCounter(w http.ResponseWriter, req *http.Request) {
	ctr := mux.Vars(req)["counter"]
	var ctrvalue = -1
	if ctr == "received" {
		ctrvalue = int(globalCounters.received.get())
	} else if ctr == "sent" {
		ctrvalue = int(globalCounters.sent.get())
	}

	if ctrvalue == -1 {
		w.WriteHeader(http.StatusBadRequest)
		fmt.Fprintf(w, "Counter %v does not exist", ctr)
		return
	}
	writeOkRepsonse(w, http.StatusOK, strconv.Itoa(ctrvalue))
	return

}

// Create a topic
// PUT on /topics/<topic>?type=<type>    type shall be 'application/json' or 'text/plain'
func createTopic(w http.ResponseWriter, req *http.Request) {
	topicId := mux.Vars(req)["topic"]
	topicType := mux.Vars(req)["type"]

	fmt.Printf("Creating topic: %v, content type: %v\n", topicId, topicType)

	if len(topicType) == 0 {
		w.WriteHeader(http.StatusBadRequest)
		fmt.Fprintf(w, "Type not specified")
		return
	}

	tid, exist := topics[topicId]
	if exist == true {
		if tid.contentType != topicType {
			w.WriteHeader(http.StatusBadRequest)
			fmt.Fprintf(w, "Topic type exist but type is different, queue content type: %v, requested content type: %v", tid.contentType, topicType)
			return
		}
		writeOkRepsonse(w, http.StatusOK, "Topic exist")
		return
	}

	t := newTopic(topicType)

	a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
	defer func() { a.Close() }()
	if err != nil {
		w.WriteHeader(http.StatusInternalServerError)
		fmt.Fprintf(w, "Cannot create client to bootstrap server: "+bootstrapserver+", error details: %v", err)
		return
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	maxDur := 10 * time.Second

	_, err = a.CreateTopics(
		ctx,
		[]kafka.TopicSpecification{{
			Topic:             topicId,
			NumPartitions:     1,
			ReplicationFactor: 1}},
		kafka.SetAdminOperationTimeout(maxDur))

	if err != nil {
		w.WriteHeader(http.StatusInternalServerError)
		fmt.Fprintf(w, "Failed to create topic: %v, error details: %v", topicId, err)
		return
	}
	topics[topicId] = t
	w.WriteHeader(http.StatusCreated)
	fmt.Fprintf(w, "Topic created")
}

// Get topic type
// GET on /topic/<topic>
func getTopic(w http.ResponseWriter, req *http.Request) {
	t, _, exist := getTopicFromRequest(w, req)
	if !exist {
		return
	}
	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, t.contentType)
}

// Get a topics counter value
// GET /topics/{topic}/counters/{counter}
func getTopicCounter(w http.ResponseWriter, req *http.Request) {
	t, _, exist := getTopicFromRequest(w, req)
	if !exist {
		return
	}
	ctr := mux.Vars(req)["counter"]

	var ctrvalue = -1
	if ctr == "received" {
		ctrvalue = int(t.counters.received.get())
	} else if ctr == "sent" {
		ctrvalue = int(t.counters.sent.get())
	}

	if ctrvalue == -1 {
		w.WriteHeader(http.StatusBadRequest)
		fmt.Fprintf(w, "Counter %v does not exist", ctr)
		return
	}
	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, strconv.Itoa(ctrvalue))
	return
}

func startToSend(w http.ResponseWriter, req *http.Request) {
	t, topicId, exist := getTopicFromRequest(w, req)
	fmt.Printf("Start to send to topic: %v\n", topicId)
	if !exist {
		return
	}

	if !t.messages.startSend() {
		w.WriteHeader(http.StatusOK)
		fmt.Fprintf(w, "Already started sending")
		return
	}
	go func() {
		p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
		if err != nil {
			fmt.Printf("Cannot create producer for topic: %v, error details: %v\n", topicId, err)
			return
		}
		defer func() {
			fmt.Printf("Closing producer for topic: %v\n", topicId)
			p.Close()
		}()
		for {
			q := t.messages.send
			if q == nil {
				return
			}
			m, err := q.Get(0)
			if err == nil {
				err = p.Produce(&kafka.Message{
					TopicPartition: kafka.TopicPartition{Topic: &topicId, Partition: kafka.PartitionAny},
					Value:          []byte(fmt.Sprintf("%v", m)),
				}, nil)
				if err == nil {
					q.Remove(0)
					t.counters.sent.step()
					globalCounters.sent.step()
					msg := fmt.Sprintf("%v", m)
					if len(msg) < 500 {
						fmt.Printf("Message sent on topic: %v, len: %v, msg: %v\n", topicId, len(msg), msg)
					} else {
						fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed\n", topicId, len(msg))
					}
				} else {
					fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v\n", topicId, err)
					q.Remove(0)
				}
			} else {
				time.Sleep(10 * time.Millisecond)
			}
		}
	}()

	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, "Sending started")
}

func startToReceive(w http.ResponseWriter, req *http.Request) {
	t, topicId, exist := getTopicFromRequest(w, req)
	if !exist {
		return
	}

	if !t.messages.startReceive() {
		w.WriteHeader(http.StatusOK)
		fmt.Fprintf(w, "Already started receiving")
		return
	}

	go func() {

		defer func() { t.messages.stopReceive() }()

		groudId := "kafkaprocon"

		c, err := kafka.NewConsumer(&kafka.ConfigMap{
			"bootstrap.servers":       bootstrapserver,
			"group.id":                groudId,
			"auto.offset.reset":       "earliest",
			"enable.auto.commit":      true,
			"auto.commit.interval.ms": 5000,
		})
		if err != nil {
			fmt.Printf("Cannot create consumer for topic: %v, error details: %v\n", topicId, err)
			t.messages.stopReceive()
			return
		}
		c.Commit()
		defer func() { c.Close() }()
		for {
			que := t.messages.received
			if que == nil {
				fmt.Printf("Cannot start receiving on topic: %v, queue does not exist\n", topicId)
				return
			}
			fmt.Printf("Start subscribing on topic: %v\n", topicId)
			err = c.SubscribeTopics([]string{topicId}, nil)
			if err != nil {
				fmt.Printf("Cannot start subscribing on topic: %v, error details: %v\n", topicId, err)
				return
			}
			maxDur := 1 * time.Second
			for {
				msg, err := c.ReadMessage(maxDur)
				if err == nil {
					if len(msg.Value) < 500 {
						fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v\n", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value))
					} else {
						fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed\n", topicId, msg.TopicPartition, len(msg.Value))
					}
					err = t.messages.received.Enqueue(string(msg.Value))
					if err != nil {
						w.WriteHeader(http.StatusInternalServerError)
						fmt.Fprintf(w, "Received message topic: %v, cannot be put in queue, %v", topicId, err)
						return
					}
					t.counters.received.step()
					globalCounters.received.step()
				} else {
					fmt.Printf("Nothing to consume on topic: %v, reason: %v\n", topicId, err)
				}
			}
		}
	}()

	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, "Receiving started")
}

// Post a message to a topic
// POST /send    content type is specified in content type
func sendToTopic(w http.ResponseWriter, req *http.Request) {

	t, topicId, exist := getTopicFromRequest(w, req)
	fmt.Printf("Send to topic: %v\n", topicId)
	if !exist {
		return
	}
	q := t.messages.send
	if q == nil {
		w.WriteHeader(http.StatusBadRequest)
		fmt.Fprintf(w, "Sending not initiated on topic: %v", topicId)
		return
	}
	ct := req.Header.Get("Content-Type")
	if ct != t.contentType {
		w.WriteHeader(http.StatusBadRequest)
		fmt.Fprintf(w, "Message to send content type: %v on topic: %v does not match queue content type: %v", ct, topicId, t.contentType)
		return
	}

	if ct == "application/json" {
		// decoder := json.NewDecoder(req.Body)
		// var j :=
		// err := decoder.Decode(&j)
		// if err != nil {
		// 	w.WriteHeader(http.StatusBadRequest)
		// 	w.Header().Set("Content-Type", "text/plain")
		// 	fmt.Fprintf(w, "Json payload cannot be decoded, error details %v\n", err)
		// 	return
		// }
		//m = mux.Vars(req)[""]
		if err := req.ParseForm(); err != nil {
			w.WriteHeader(http.StatusBadRequest)
			fmt.Fprintf(w, "Json payload cannot be decoded on topic: %v, error details %v", topicId, err)
			return
		}
		b, err := ioutil.ReadAll(req.Body)
		if err == nil {
			if len(b) < 500 {
				fmt.Printf("Json payload to send on topic: %v, msg: %v\n", topicId, string(b))
			} else {
				fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId)
			}
		} else {
			fmt.Printf("Json payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err)
		}
		err = q.Enqueue(string(b))
		if err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			fmt.Fprintf(w, "Json message to send cannot be put in queue")
			return
		}
	} else if ct == "text/plain" {
		if err := req.ParseForm(); err != nil {
			w.WriteHeader(http.StatusBadRequest)
			fmt.Fprintf(w, "Text payload to send on topic: %v cannot be decoded, error details %v\n", topicId, err)
			return
		}
		b, err := ioutil.ReadAll(req.Body)
		if err == nil {
			if len(b) < 500 {
				fmt.Printf("Text payload to send on topic: %v, msg: %v\n", topicId, string(b))
			} else {
				fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId)
			}
		} else {
			fmt.Printf("Text payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err)
		}
		err = q.Enqueue(string(b))
		if err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			fmt.Fprintf(w, "Text message to send cannot be put in queue")
			return
		}
	} else {
		w.WriteHeader(http.StatusBadRequest)
		fmt.Fprintf(w, "Message to send, unknown content type %v", ct)
		return
	}

	w.WriteHeader(http.StatusOK)
	w.Header().Set("Content-Type", "text/plain")
	fmt.Fprintf(w, "Message to send put in queue")
}

// Get zero or one message from a topic
// GET /receive
func receiveFromTopic(w http.ResponseWriter, req *http.Request) {
	t, topicId, exist := getTopicFromRequest(w, req)
	if !exist {
		return
	}
	if t.messages.received == nil {
		w.WriteHeader(http.StatusBadRequest)
		fmt.Fprintf(w, "Receiving not initiated on topic %v", topicId)
		return
	}

	m, err := t.messages.received.Dequeue()
	if err != nil {
		w.WriteHeader(http.StatusNoContent)
		return
	}

	w.Header().Set("Content-Type", t.contentType)
	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, "%v", m)
}

// Remove the send queue to stop sending
func stopToSend(w http.ResponseWriter, req *http.Request) {
	fmt.Printf("Stop sending\n")
	t, _, exist := getTopicFromRequest(w, req)
	if !exist {
		return
	}
	t.messages.stopSend()
	w.WriteHeader(http.StatusNoContent)
}

// Remove the receive queue to stop receiving
func stopToReceive(w http.ResponseWriter, req *http.Request) {
	fmt.Printf("Stop receiving\n")
	t, _, exist := getTopicFromRequest(w, req)
	if !exist {
		return
	}
	t.messages.stopReceive()
	w.WriteHeader(http.StatusNoContent)
}

func HelloServer(w http.ResponseWriter, r *http.Request) {
	fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
}

func main() {

	initApp()

	r := mux.NewRouter()

	r.HandleFunc("/", healthCheck).Methods("GET")
	r.HandleFunc("/reset", allreset).Methods("POST")
	r.HandleFunc("/counters/{counter}", getCounter).Methods("GET")
	r.HandleFunc("/topics", getTopics).Methods("GET")
	r.HandleFunc("/topics/{topic}/counters/{counter}", getTopicCounter).Methods("GET")
	r.HandleFunc("/topics/{topic}", createTopic).Methods("PUT").Queries("type", "{type}")
	r.HandleFunc("/topics/{topic}", getTopic).Methods("GET")
	r.HandleFunc("/topics/{topic}/startsend", startToSend).Methods("POST")
	r.HandleFunc("/topics/{topic}/startreceive", startToReceive).Methods("POST")
	r.HandleFunc("/topics/{topic}/stopsend", stopToSend).Methods("POST")
	r.HandleFunc("/topics/{topic}/stopreceive", stopToReceive).Methods("POST")
	r.HandleFunc("/topics/{topic}/msg", sendToTopic).Methods("POST")
	r.HandleFunc("/topics/{topic}/msg", receiveFromTopic).Methods("GET")

	port := "8090"
	srv := &http.Server{
		Handler:      r,
		Addr:         ":" + port,
		WriteTimeout: 15 * time.Second,
		ReadTimeout:  15 * time.Second,
	}
	fmt.Println("Running on port: " + port)
	fmt.Printf(srv.ListenAndServe().Error())
}
