Skip to content

Commit

Permalink
Fix health error condition
Browse files Browse the repository at this point in the history
  • Loading branch information
OneCricketeer authored and marky-mark committed Jan 10, 2019
1 parent 517ff95 commit 5d07a4d
Showing 1 changed file with 27 additions and 19 deletions.
46 changes: 27 additions & 19 deletions src/main/scala/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model.{HttpResponse, MediaTypes, StatusCodes}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{ExceptionHandler, Route, StandardRoute}
import akka.http.scaladsl.server.{ExceptionHandler, Route}
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.util.Timeout
Expand All @@ -25,7 +25,7 @@ import org.apache.kafka.clients.admin.DescribeClusterResult
import play.api.libs.json._

import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.{Future, TimeoutException}
import scala.concurrent.duration._
import scala.reflect.ClassTag

Expand Down Expand Up @@ -83,27 +83,35 @@ class Api(kafkaClientActorRef: ActorRef)
}
}

def healthCheck: StandardRoute = {
def healthCheck = {
val checkDuration = timeoutDuration.duration
val clusterHealthFuture = askFor[DescribeClusterResult](DescribeKafkaCluster).map { clusterDesc =>
val clusterId = clusterDesc.clusterId.get(checkDuration.length, checkDuration.unit)
val controller = clusterDesc.controller.get(checkDuration.length, checkDuration.unit)
val clusterNodes = clusterDesc.nodes.get(checkDuration.length, checkDuration.unit).asScala
logger.debug(s"clusterId: $clusterId; controller: $controller; nodes: $clusterNodes")

val clusterIdAvailable: Boolean = Option(clusterId).isDefined
if (clusterIdAvailable && clusterNodes.nonEmpty) {
val resp = KafkaClusterHealthResponse(
clusterId,
models.Node.from(controller),
clusterNodes.map(models.Node.from).toSeq
)
Json.toJson(resp)
} else {
Json.toJson("Error connecting to Kafka Cluster")
try {
val clusterId = clusterDesc.clusterId.get(checkDuration.length, checkDuration.unit)
val controller = clusterDesc.controller.get(checkDuration.length, checkDuration.unit)
val clusterNodes = clusterDesc.nodes.get(checkDuration.length, checkDuration.unit).asScala

logger.debug(s"clusterId: $clusterId; controller: $controller; nodes: $clusterNodes")

val clusterIdAvailable: Boolean = Option(clusterId).isDefined
if (clusterIdAvailable && clusterNodes.nonEmpty) {
val resp = KafkaClusterHealthResponse(
clusterId,
models.Node.from(controller),
clusterNodes.map(models.Node.from).toSeq
)
Json.toJson(resp)
} else {
throw new RuntimeException("Error connecting to Kafka Cluster")
}
} catch {
case _: TimeoutException =>
throw new TimeoutException("Timed out getting cluster details.")
case _: Exception =>
throw new RuntimeException("Error connecting to Kafka Cluster")
}
}
complete(clusterHealthFuture)
completeOrRecoverWith(clusterHealthFuture) { failWith }
}

def start(): Future[Http.ServerBinding] = Http().bindAndHandle(route, "0.0.0.0", settings.port)
Expand Down

0 comments on commit 5d07a4d

Please sign in to comment.