blob: 16abcb4e882dcdaa79f01d2256e05b91f96d6a33 [file] [log] [blame]
// -
// ========================LICENSE_START=================================
// O-RAN-SC
// %%
// Copyright (C) 2021: Nordix Foundation
// %%
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ========================LICENSE_END===================================
//
package kafkaclient
import (
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
type KafkaFactory interface {
NewKafkaConsumer(topicID string) (KafkaConsumer, error)
}
type KafkaFactoryImpl struct {
BootstrapServer string
}
func (kf KafkaFactoryImpl) NewKafkaConsumer(topicID string) (KafkaConsumer, error) {
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": kf.BootstrapServer,
"group.id": "dmaap-mediator-producer",
"auto.offset.reset": "earliest",
})
if err != nil {
return nil, err
}
return KafkaConsumerImpl{consumer: consumer}, nil
}
func NewKafkaClient(factory KafkaFactory, topicID string) (KafkaClient, error) {
consumer, err := factory.NewKafkaConsumer(topicID)
if err != nil {
return KafkaClient{}, err
}
consumer.Commit()
err = consumer.Subscribe(topicID)
if err != nil {
return KafkaClient{}, err
}
return KafkaClient{consumer: consumer}, nil
}
type KafkaClient struct {
consumer KafkaConsumer
}
func (kc KafkaClient) ReadMessage() ([]byte, error) {
msg, err := kc.consumer.ReadMessage(time.Second)
if err != nil {
return nil, err
}
return msg.Value, nil
}
type KafkaConsumer interface {
Commit() ([]kafka.TopicPartition, error)
Subscribe(topic string) (err error)
ReadMessage(timeout time.Duration) (*kafka.Message, error)
}
type KafkaConsumerImpl struct {
consumer *kafka.Consumer
}
func (kc KafkaConsumerImpl) Commit() ([]kafka.TopicPartition, error) {
return kc.consumer.Commit()
}
func (kc KafkaConsumerImpl) Subscribe(topic string) error {
return kc.consumer.Subscribe(topic, nil)
}
func (kc KafkaConsumerImpl) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
return kc.consumer.ReadMessage(timeout)
}