Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support zio2 #147

Closed
wants to merge 14 commits into from
6 changes: 3 additions & 3 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
version=3.4.3
runner.dialect=scala3
maxColumn = 120
align = most
align.preset = "most"
continuationIndent.defnSite = 2
assumeStandardLibraryStripMargin = true
docstrings = JavaDoc
docstrings.style = Asterisk
lineEndings = preserve
includeCurlyBraceInSelectChains = false
danglingParentheses = true
danglingParentheses.preset = true
spaces {
inImportCurlyBraces = true
}
Expand Down
32 changes: 13 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Purely functional Scala wrapper over the official Pulsar client.

- Type-safe (utilizes Scala type system to reduce runtime exceptions present in the official Java client)
- Streaming-enabled (naturally integrates with ZIO Streams)
- ZIO integrated (uses common ZIO primitives like ZIO effect and ZManaged to reduce the boilerplate and increase expressiveness)
- ZIO integrated (uses common ZIO primitives to reduce the boilerplate and increase expressiveness)

## Compatibility

Expand Down Expand Up @@ -41,33 +41,27 @@ libraryDependencies ++= Seq(
Simple example of consumer and producer:

```scala
import org.apache.pulsar.client.api.{ PulsarClientException, Schema }
import zio._
import zio.pulsar._

object Main extends App:
object SingleMessageExample extends ZIOAppDefault:

val pulsarClient = PulsarClient.live("localhost", 6650)

val topic = "my-topic"
val topic = "single-topic"

val app: ZManaged[PulsarClient, PulsarClientException, Unit] =
val app: ZIO[PulsarClient & Scope, PulsarClientException, Unit] =
for
builder <- ConsumerBuilder.make(Schema.STRING).toManaged_
builder <- ConsumerBuilder.make(JSchema.STRING)
consumer <- builder
.topic(topic)
.subscription(
Subscription(
"my-subscription",
SubscriptionType.Shared))
.subscription(Subscription("my-subscription", SubscriptionType.Shared))
.build
producer <- Producer.make(topic, Schema.STRING)
_ <- producer.send("Hello!").toManaged_
m <- consumer.receive.toManaged_
producer <- Producer.make(topic, JSchema.STRING)
_ <- producer.send("Hello!")
m <- consumer.receive
_ = println(m.getValue)
yield ()
def run(args: List[String]): URIO[ZEnv, ExitCode] =
app.provideCustomLayer(pulsarClient).useNow.exitCode

override def run = app.provideLayer(pulsarClient ++ Scope.default).exitCode

```

## Running examples locally
Expand Down
52 changes: 26 additions & 26 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
val zioVersion = "2.0.0-RC2"
val zioVersion = "2.0.7"

inThisBuild(
List(
organization := "com.github.jczuchnowski",
homepage := Some(url("https://github.com/jczuchnowski/zio-pulsar/")),
licenses := List("BSD 2-Clause" -> url("https://opensource.org/licenses/BSD-2-Clause")),
developers := List(
homepage := Some(url("https://github.com/jczuchnowski/zio-pulsar/")),
licenses := List("BSD 2-Clause" -> url("https://opensource.org/licenses/BSD-2-Clause")),
developers := List(
Developer(
"jczuchnowski",
"Jakub Czuchnowski",
"[email protected]",
url("https://github.com/jczuchnowski")
)
),
scalaVersion := "3.1.1"
scalaVersion := "3.2.0"
)
)

Expand All @@ -23,20 +23,20 @@ addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck"
lazy val core = project
.in(file("core"))
.settings(
name := "zio-pulsar",
name := "zio-pulsar",
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % zioVersion % Provided,
"dev.zio" %% "zio-streams" % zioVersion % Provided,
"dev.zio" %% "zio-json" % "0.3.0-RC3" % Provided,
"com.sksamuel.avro4s" %% "avro4s-core" % "5.0.0.M1",
"org.apache.pulsar" % "pulsar-client" % "2.9.1",
"ch.qos.logback" % "logback-classic" % "1.2.10",
"dev.zio" %% "zio-test" % zioVersion % Test,
"dev.zio" %% "zio-test-sbt" % zioVersion % Test,
"dev.zio" %% "zio-test-junit" % zioVersion % Test,
"dev.zio" %% "zio-test-magnolia" % zioVersion % Test,
"org.testcontainers" % "pulsar" % "1.16.3" % Test,
"com.dimafeng" %% "testcontainers-scala-pulsar" % "0.40.1" % Test
"dev.zio" %% "zio" % zioVersion % Provided,
"dev.zio" %% "zio-streams" % zioVersion % Provided,
"dev.zio" %% "zio-json" % "0.4.2" % Provided,
"com.sksamuel.avro4s" %% "avro4s-core" % "5.0.3",
"org.apache.pulsar" % "pulsar-client" % "2.10.0",
"ch.qos.logback" % "logback-classic" % "1.4.5",
"dev.zio" %% "zio-test" % zioVersion % Test,
"dev.zio" %% "zio-test-sbt" % zioVersion % Test,
"dev.zio" %% "zio-test-junit" % zioVersion % Test,
"dev.zio" %% "zio-test-magnolia" % zioVersion % Test,
"org.testcontainers" % "pulsar" % "1.17.6" % Test,
"com.dimafeng" %% "testcontainers-scala-pulsar" % "0.40.12" % Test
),
testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework"))
)
Expand All @@ -45,19 +45,19 @@ lazy val examples = project
.in(file("examples"))
.settings(
publish / skip := true,
moduleName := "examples",
moduleName := "examples",
libraryDependencies ++= Seq(
//"dev.zio" %% "zio-logging" % "0.5.6",
"dev.zio" %% "zio" % zioVersion,
"dev.zio" %% "zio-streams" % zioVersion,
"dev.zio" %% "zio-json" % "0.3.0-RC3",
"com.sksamuel.avro4s" %% "avro4s-core" % "5.0.0.M1",
"ch.qos.logback" % "logback-classic" % "1.2.10"
// "dev.zio" %% "zio-logging" % "0.5.6",
"dev.zio" %% "zio" % zioVersion,
"dev.zio" %% "zio-streams" % zioVersion,
"dev.zio" %% "zio-json" % "0.4.2",
"com.sksamuel.avro4s" %% "avro4s-core" % "5.0.3",
"ch.qos.logback" % "logback-classic" % "1.4.5"
)
)
.dependsOn(core)

lazy val root = project
lazy val `zio-pulsar` = project
.in(file("."))
.settings(
publish / skip := true
Expand Down
29 changes: 18 additions & 11 deletions core/src/main/scala/zio/pulsar/Consumer.scala
Original file line number Diff line number Diff line change
@@ -1,28 +1,35 @@
package zio.pulsar

import org.apache.pulsar.client.api.{
Message,
MessageId,
Consumer => JConsumer,
PulsarClientException,
}
import org.apache.pulsar.client.api.{ Consumer as JConsumer, Message, MessageId, PulsarClientException }
import zio.{ IO, ZIO }

import java.util.concurrent.TimeUnit
//import zio.blocking._
import zio.stream._
import scala.jdk.CollectionConverters._

final class Consumer[M](val consumer: JConsumer[M]):

def acknowledge(messageId: MessageId): IO[PulsarClientException, Unit] =
ZIO.effect(consumer.acknowledge(messageId)).refineToOrDie[PulsarClientException]
ZIO.attempt(consumer.acknowledge(messageId)).refineToOrDie[PulsarClientException]

def acknowledge[T](message: Message[T]): IO[PulsarClientException, Unit] =
ZIO.attempt(consumer.acknowledge(message)).refineToOrDie[PulsarClientException]

def acknowledge(messages: Seq[MessageId]): IO[PulsarClientException, Unit] =
ZIO.attempt(consumer.acknowledge(messages.asJava)).refineToOrDie[PulsarClientException]

def negativeAcknowledge(messageId: MessageId): IO[PulsarClientException, Unit] =
ZIO.effect(consumer.negativeAcknowledge(messageId)).refineToOrDie[PulsarClientException]
ZIO.attempt(consumer.negativeAcknowledge(messageId)).refineToOrDie[PulsarClientException]

val receive: IO[PulsarClientException, Message[M]] =
ZIO.effect(consumer.receive).refineToOrDie[PulsarClientException]
ZIO.attempt(consumer.receive).refineToOrDie[PulsarClientException]

def receive(timeout: Int, unit: TimeUnit): IO[PulsarClientException, Message[M]] =
ZIO.attempt(consumer.receive(timeout, unit)).refineToOrDie[PulsarClientException]

val receiveAsync: IO[PulsarClientException, Message[M]] =
ZIO.fromCompletionStage(consumer.receiveAsync).refineToOrDie[PulsarClientException]

val receiveStream: Stream[PulsarClientException, Message[M]] =
ZStream.repeatEffect(ZIO.attemptBlocking(consumer.receive).refineToOrDie[PulsarClientException])
val receiveStream: Stream[PulsarClientException, Message[M]] =
ZStream.repeatZIO(ZIO.attemptBlocking(consumer.receive).refineToOrDie[PulsarClientException])
Loading