Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
OneCricketeer authored and marky-mark committed Jan 10, 2019
1 parent 8c0b4c7 commit 517ff95
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 9 deletions.
17 changes: 15 additions & 2 deletions src/main/scala/JsonOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,25 @@ object JsonOps {
(__ \ "rack").writeNullable[String]
) (unlift(Node.unapply))

implicit val nodeReads: Reads[Node] = (
(__ \ "id").readNullable[Int] and
(__ \ "id_string").readNullable[String] and
(__ \ "host").readNullable[String] and
(__ \ "port").readNullable[Int] and
(__ \ "rack").readNullable[String]
) (Node.apply _)

implicit val clusterHealthWrites: Writes[KafkaClusterHealthResponse] = (
(__ \ "cluster_id").write[String] and
(__ \ "controller").write[Node] and
(__ \ "nodes").write[Seq[Node]]
) (unlift(KafkaClusterHealthResponse.unapply)
)
) (unlift(KafkaClusterHealthResponse.unapply))

implicit val clusterHealthReads: Reads[KafkaClusterHealthResponse] = (
(__ \ "cluster_id").read[String] and
(__ \ "controller").read[Node] and
(__ \ "nodes").lazyRead(Reads.seq[Node])
) (KafkaClusterHealthResponse.apply _)

implicit val partitionAssignmentStateWrites: Writes[PartitionAssignmentState] = (
(__ \ "group").write[String] and
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/Models.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package object models {
case class GroupInfo(state: Option[String] = None, partitionAssignmentStates: Option[Seq[PartitionAssignmentState]] = None, lagPerTopic: Option[Map[String, Long]] = None)

object Node {
def from(n: org.apache.kafka.common.Node): Node = Node(Some(n.id), Some(n.idString), Some(n.host), Some(n.port), Some(n.rack))
def from(n: org.apache.kafka.common.Node): Node = Node(Option(n.id), Option(n.idString), Option(n.host), Option(n.port), Option(n.rack))
}
case class Node(id: Option[Int] = None, idString: Option[String] = None, host: Option[String] = None, port: Option[Int] = None, rack: Option[String] = None)

Expand Down
19 changes: 13 additions & 6 deletions src/test/scala/ApiSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import config.KafkaSettings

import scala.concurrent.duration._
import com.typesafe.config.ConfigValueFactory
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecords, OffsetResetStrategy}
import org.apache.kafka.common.serialization.Deserializer
import org.scalatest.concurrent.Eventually
import org.scalatest.time.{Millis, Seconds, Span}
Expand All @@ -20,6 +20,7 @@ import kafka.admin.RemoraKafkaConsumerGroupService
import models.{KafkaClusterHealthResponse, Node}
import net.manub.embeddedkafka.Codecs.stringDeserializer
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.clients.CommonClientConfigs
import play.api.libs.json._

class ApiSpec extends FlatSpec with Matchers with BeforeAndAfterAll with ScalatestRouteTest with PlayJsonSupport with Eventually {
Expand All @@ -43,9 +44,9 @@ class ApiSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Scalate

val consumerProps: Properties = {
val props = new Properties()
props.put("group.id", consumerGroup)
props.put("bootstrap.servers", kafkaHost)
props.put("auto.offset.reset", "earliest")
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaHost)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString.toLowerCase)
props
}

Expand Down Expand Up @@ -94,7 +95,7 @@ class ApiSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Scalate
contentType should be(ContentTypes.`application/json`)

import scala.collection.JavaConverters._
import JsonOps.clusterHealthWrites
import JsonOps.clusterHealthReads

val clusterDesc = kafkaSettings.adminClient.describeCluster

Expand All @@ -104,7 +105,13 @@ class ApiSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Scalate
val nodes = clusterDesc.nodes.get(waitFor.length, waitFor.unit).asScala

val resp = KafkaClusterHealthResponse(clusterId, Node.from(controller), nodes.map(Node.from).toSeq)
entityAs[JsValue] should be(Json.toJson(resp))
val entity = Json.fromJson[KafkaClusterHealthResponse](entityAs[JsValue])

assert(entity.isSuccess)
entity.get.clusterId should be(resp.clusterId)
entity.get.controller should be(resp.controller)
entity.get.nodes.length should be(1)
entity.get.nodes should be(resp.nodes)
}
}
}

0 comments on commit 517ff95

Please sign in to comment.