Skip to content

Commit

Permalink
Healthcheck with Cluster info
Browse files Browse the repository at this point in the history
  • Loading branch information
OneCricketeer authored and marky-mark committed Jan 10, 2019
1 parent 745691d commit 8c0b4c7
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 37 deletions.
57 changes: 23 additions & 34 deletions src/main/scala/Api.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

import java.util.concurrent.TimeUnit

import JsonOps._
import KafkaClientActor.{Command, DescribeKafkaCluster, DescribeKafkaConsumerGroup, ListConsumers}
import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch.MessageDispatcher
Expand All @@ -14,17 +15,17 @@ import akka.stream.ActorMaterializer
import akka.util.Timeout
import backline.http.metrics.{StatusCodeCounterDirectives, TimerDirectives}
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.health.HealthCheck.Result
import com.codahale.metrics.json.MetricsModule
import com.fasterxml.jackson.databind.ObjectMapper
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import de.heikoseeberger.akkahttpplayjson.PlayJsonSupport
import models.{GroupInfo, Health}
import models.{GroupInfo, KafkaClusterHealthResponse}
import org.apache.kafka.clients.admin.DescribeClusterResult
import play.api.libs.json._

import scala.concurrent.{Await, Future, TimeoutException}
import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.reflect.ClassTag

Expand Down Expand Up @@ -65,7 +66,6 @@ class Api(kafkaClientActorRef: ActorRef)
withTimer {
withStatusCodeCounter {
redirectToNoTrailingSlashIfPresent(StatusCodes.Found) {
import JsonOps._
handleExceptions(remoraExceptionHandler) {
path("metrics") {
complete(metricRegistry)
Expand All @@ -84,37 +84,26 @@ class Api(kafkaClientActorRef: ActorRef)
}

def healthCheck: StandardRoute = {
getHealth match {
case Health(true, message, None) => complete(message)
case Health(false, _, Some(e)) => failWith(e)
}
}

def getHealth: Health = {
val health = healthCheck("Health") {

val checkDuration = timeoutDuration.duration
val healthFuture = askFor[DescribeClusterResult](DescribeKafkaCluster).map { clusterDesc =>
val clusterNodes = clusterDesc.nodes.get(checkDuration.length, checkDuration.unit)
val clusterId = clusterDesc.clusterId.get(checkDuration.length, checkDuration.unit)
logger.debug(s"clusterId: $clusterId; nodes: $clusterNodes")

val clusterIdAvailable: Boolean = Option(clusterId).isDefined
if (clusterIdAvailable && !clusterNodes.isEmpty) {
Result.healthy("OK")
} else {
Result.unhealthy("Error connecting to Kafka Cluster")
}
}

try {
Await.result[Result](healthFuture, checkDuration)
} catch {
case e: TimeoutException => Result.unhealthy("Error connecting to Kafka Cluster")
val checkDuration = timeoutDuration.duration
val clusterHealthFuture = askFor[DescribeClusterResult](DescribeKafkaCluster).map { clusterDesc =>
val clusterId = clusterDesc.clusterId.get(checkDuration.length, checkDuration.unit)
val controller = clusterDesc.controller.get(checkDuration.length, checkDuration.unit)
val clusterNodes = clusterDesc.nodes.get(checkDuration.length, checkDuration.unit).asScala
logger.debug(s"clusterId: $clusterId; controller: $controller; nodes: $clusterNodes")

val clusterIdAvailable: Boolean = Option(clusterId).isDefined
if (clusterIdAvailable && clusterNodes.nonEmpty) {
val resp = KafkaClusterHealthResponse(
clusterId,
models.Node.from(controller),
clusterNodes.map(models.Node.from).toSeq
)
Json.toJson(resp)
} else {
Json.toJson("Error connecting to Kafka Cluster")
}

}.execute()
Health(health.isHealthy, health.getMessage, Option(health.getError))
}
complete(clusterHealthFuture)
}

def start(): Future[Http.ServerBinding] = Http().bindAndHandle(route, "0.0.0.0", settings.port)
Expand Down
9 changes: 8 additions & 1 deletion src/main/scala/JsonOps.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import models.{GroupInfo, Node, PartitionAssignmentState}
import models.{GroupInfo, KafkaClusterHealthResponse, Node, PartitionAssignmentState}
import play.api.libs.functional.syntax._
import play.api.libs.json._

Expand All @@ -12,6 +12,13 @@ object JsonOps {
(__ \ "rack").writeNullable[String]
) (unlift(Node.unapply))

implicit val clusterHealthWrites: Writes[KafkaClusterHealthResponse] = (
(__ \ "cluster_id").write[String] and
(__ \ "controller").write[Node] and
(__ \ "nodes").write[Seq[Node]]
) (unlift(KafkaClusterHealthResponse.unapply)
)

implicit val partitionAssignmentStateWrites: Writes[PartitionAssignmentState] = (
(__ \ "group").write[String] and
(__ \ "coordinator").writeNullable[Node] and
Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/Models.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@ package object models {

case class GroupInfo(state: Option[String] = None, partitionAssignmentStates: Option[Seq[PartitionAssignmentState]] = None, lagPerTopic: Option[Map[String, Long]] = None)

object Node {
def from(n: org.apache.kafka.common.Node): Node = Node(Some(n.id), Some(n.idString), Some(n.host), Some(n.port), Some(n.rack))
}
case class Node(id: Option[Int] = None, idString: Option[String] = None, host: Option[String] = None, port: Option[Int] = None, rack: Option[String] = None)

case class KafkaClusterHealthResponse(clusterId: String, controller: Node, nodes: Seq[Node])

//This is a copy of the object inside the KafkaConsumerGroupService which is protected
case class PartitionAssignmentState(group: String, coordinator: Option[Node] = None, topic: Option[String] = None,
partition: Option[Int] = None, offset: Option[Long] = None, lag: Option[Long] = None,
Expand Down
19 changes: 17 additions & 2 deletions src/test/scala/ApiSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import org.slf4j.LoggerFactory
import akka.actor.ActorSystem
import akka.http.scaladsl.model.ContentTypes
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
import akka.testkit.{TestActorRef, TestKit}
import de.heikoseeberger.akkahttpplayjson.PlayJsonSupport
import kafka.admin.RemoraKafkaConsumerGroupService
import models.{KafkaClusterHealthResponse, Node}
import net.manub.embeddedkafka.Codecs.stringDeserializer
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import play.api.libs.json._
Expand Down Expand Up @@ -86,10 +88,23 @@ class ApiSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Scalate
}
}

"GET" should "return a 200 to /health" in {
"GET" should "return a 200 with cluster JSON info to /health" in {
Get("/health") ~> ApiTest.route ~> check {
status should be(OK)
entityAs[String] should be("OK")
contentType should be(ContentTypes.`application/json`)

import scala.collection.JavaConverters._
import JsonOps.clusterHealthWrites

val clusterDesc = kafkaSettings.adminClient.describeCluster

val waitFor = patienceConfig.timeout
val clusterId = clusterDesc.clusterId.get(waitFor.length, waitFor.unit)
val controller = clusterDesc.controller.get(waitFor.length, waitFor.unit)
val nodes = clusterDesc.nodes.get(waitFor.length, waitFor.unit).asScala

val resp = KafkaClusterHealthResponse(clusterId, Node.from(controller), nodes.map(Node.from).toSeq)
entityAs[JsValue] should be(Json.toJson(resp))
}
}
}

0 comments on commit 8c0b4c7

Please sign in to comment.