Skip to content

Commit

Permalink
Merge pull request #6 from alexarchambault/recoverable-errors
Browse files Browse the repository at this point in the history
Mark LockError-s as either recoverable or fatal, tweak API, add README
  • Loading branch information
alexarchambault authored Dec 2, 2021
2 parents 9266e0b + d500dad commit bb1789f
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 58 deletions.
68 changes: 68 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# libdaemon-jvm

*libdaemon-jvm* is a [libdaemon](http://0pointer.de/lennart/projects/libdaemon)-inspired
library for the JVM written in Scala.

It aims at making it easier for JVM-based daemon processes to
- ensure that a single instance of it is running at a time
- rely on Unix domain sockets (or Windows named pipes) to listen to incoming connections

## Single process

*libdaemon-jvm* relies on Java file lock mechanism to ensure only a single instance
of a process is running at a time.

More concretely, it is passed a directory, where it writes or creates:
- a lock file
- a PID file
- a domain socket (except when named pipes are used on Windows)

It ensures that no-two processes relying on the same directory can run at a time, relying
on both the PID file and the domain socket to check for another running process.

## Domain sockets

*libdaemon-jvm* creates Unix domain sockets or Windows named pipes using either
- the JNI Unix domain socket and Windows named pipe support in the [ipcsocket](https://github.com/sbt/ipcsocket) library
- Unix domain socket support in Java >= 16

The ipcsocket library JNI support is only available on Linux / macOS / Windows for the
x86_64 architecture, and macOS for the ARM64 architecture (untested). For other OSes and
architectures, Java >= 16 is required.

On Windows on x86_64, *libdaemon-jvm* defaults to using ipcsocket JNI-based Windows named pipes.
On Windows but on a different architecture, it defaults to the Unix domain socket support of
Java >= 16, that happens to also work on Windows (requires a not-too-dated Windows 10 version),
but is incompatible with Windows named pipes.

On other OSes, when using Java >= 16, *libdaemon-jvm* defaults to Java's own Unix domain socket
support. On Java < 16, it only supports Linux on x86_64, or macOS on x86_64 or ARM64. Java >= 16
and ipcsocket JNI-based sockets can talk to each other on the same machine (no hard requirement
to use Java >= 16 for both clients and servers).

In all cases, when Java < 16 is supported, both Java >= 16 and Java < 16 clients and servers
can talk to each other.

## Usage

Add the following dependency to your build
```text
io.github.alexarchambault.libdaemon::libdaemon:0.0.3
```
The latest version is [![Maven Central](https://img.shields.io/maven-central/v/io.github.alexarchambault.libdaemon/libdaemon.svg)](https://maven-badges.herokuapp.com/maven-central/io.github.alexarchambault.libdaemon/libdaemon).

From the server, call `Lock.tryAcquire`, and start accepting connections on the server socket in the thunk passed to it:
```scala
import libdaemonjvm.server._
import java.nio.file._

val daemonDirectory: Path = ??? // pass a directory under the user home dir, computed with directories-jvm for example
val lockFiles = LockFiles.under(daemonDirectory, "my-app-name\\daemon") // second argument is the Windows named pipe path (that doesn't live in the file system)
Lock.tryAcquire(lockFiles) { serverSocket: Either[ServerSocket, ServerSocketChannel] =>
// serverSocket is a Right(…) when Java >= 16 Unix domain socket support is used,
// it's Left(…) when ipcsocket JNI support is used

// you should start listening on serverSocket here, and as much as possible,
// only exit this block when you are actually accepting incoming connections
}
```
23 changes: 17 additions & 6 deletions library/src/libdaemonjvm/LockFiles.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,38 @@ import java.nio.file.attribute.PosixFilePermission
import java.nio.file.StandardOpenOption
import scala.collection.JavaConverters._
import scala.util.Properties
import libdaemonjvm.server.LockError
import java.nio.channels.OverlappingFileLockException

final case class LockFiles(
lockFile: Path,
pidFile: Path,
socketPaths: SocketPaths
) {
def withLock[T](t: => T): T = {
def withLock[T](t: => Either[LockError, T]): Either[LockError, T] = {
if (!Files.exists(lockFile)) {
Files.createDirectories(lockFile.normalize.getParent)
Files.write(lockFile, Array.emptyByteArray)
}
var c: FileChannel = null
var l: FileLock = null
var c: FileChannel = null
var l: Either[OverlappingFileLockException, FileLock] = null
try {
c = FileChannel.open(lockFile, StandardOpenOption.WRITE)
l = c.lock()
t
l =
try Right(c.tryLock())
catch {
case ex: OverlappingFileLockException =>
Left(ex)
}
l match {
case Left(ex) => Left(new LockError.Locked(lockFile, ex))
case Right(null) => Left(new LockError.Locked(lockFile))
case Right(_) => t
}
}
finally {
if (l != null)
try l.release()
try l.foreach(_.release())
catch {
case _: ClosedChannelException =>
case _: IOException =>
Expand Down
18 changes: 10 additions & 8 deletions library/src/libdaemonjvm/server/Lock.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@ import java.net.ServerSocket

object Lock {

def tryAcquire[T](files: LockFiles)
: Either[LockError, Either[ServerSocket, ServerSocketChannel]] =
tryAcquire(files, LockProcess.default, SocketHandler.server(files.socketPaths))
def tryAcquire[T](
files: LockFiles,
proc: LockProcess
): Either[LockError, Either[ServerSocket, ServerSocketChannel]] =
tryAcquire(files, proc, SocketHandler.server(files.socketPaths))
files: LockFiles
)(
startListening: Either[ServerSocket, ServerSocketChannel] => T
): Either[LockError, T] =
tryAcquire(files, LockProcess.default) {
val socket = SocketHandler.server(files.socketPaths)
startListening(socket)
}

def tryAcquire[T](
files: LockFiles,
proc: LockProcess,
proc: LockProcess
)(
setup: => T
): Either[LockError, T] = {

Expand Down
19 changes: 16 additions & 3 deletions library/src/libdaemonjvm/server/LockError.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,23 @@ sealed abstract class LockError(
) extends Exception(message, cause)

object LockError {

sealed abstract class RecoverableError(
message: String,
cause: Throwable = null
) extends LockError(message, cause)

sealed abstract class FatalError(
message: String,
cause: Throwable = null
) extends LockError(message, cause)

final class AlreadyRunning(val pid: Int)
extends LockError(s"Daemon already running (PID: $pid)")
extends FatalError(s"Daemon already running (PID: $pid)")
final class CannotDeleteFile(val file: Path, cause: Throwable)
extends LockError(s"Cannot delete $file", cause)
extends FatalError(s"Cannot delete $file", cause)
final class ZombieFound(val pid: Int, val connectionError: Throwable)
extends LockError(s"Cannot connect to process $pid", connectionError)
extends RecoverableError(s"Cannot connect to process $pid", connectionError)
final class Locked(val file: Path, cause: Throwable = null)
extends RecoverableError(s"$file already locked", cause)
}
46 changes: 24 additions & 22 deletions manual/server/src/libdaemonjvm/TestServer.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package libdaemonjvm

import java.io.IOException
import java.io.{Closeable, IOException}
import java.nio.file.{Files, Paths}
import java.nio.file.attribute.PosixFilePermission
import java.util.concurrent.atomic.AtomicInteger
Expand All @@ -15,6 +15,26 @@ import libdaemonjvm.server.Lock
object TestServer {
val delay = 2.seconds
def runTestClients = false

def runServer(incomingConn: () => Closeable): Unit = {
val count = new AtomicInteger
while (true) {
println("Waiting for clients")
val c = incomingConn()
val idx = count.incrementAndGet()
val runnable: Runnable = { () =>
println(s"New incoming connection $idx, closing it in $delay")
Thread.sleep(delay.toMillis)
println(s"Closing incoming connection $idx")
c.close()
println(s"Closed incoming connection $idx")
}
val t = new Thread(runnable)
t.start()
Thread.sleep(1000L) // meh, wait for server to be actually listening
}
}

def main(args: Array[String]): Unit = {
val path = Paths.get("data-dir")
if (!Properties.isWin) {
Expand All @@ -29,10 +49,9 @@ object TestServer {
)
}
val files = LockFiles.under(path, "libdaemonjvm\\test-server-client\\pipe")
val incomingConn = Lock.tryAcquire(files) match {
case Left(e) => throw e
case Right(Left(s)) => () => s.accept()
case Right(Right(s)) => () => s.accept()
Lock.tryAcquire(files)(s => runServer(() => s.fold(_.accept(), _.accept()))) match {
case Left(e) => throw e
case Right(()) =>
}

def clientRunnable(idx: Int): Runnable = { () =>
Expand All @@ -59,22 +78,5 @@ object TestServer {
runClient(3)
runClient(4)
}

val count = new AtomicInteger
while (true) {
println("Waiting for clients")
val c = incomingConn()
val idx = count.incrementAndGet()
val runnable: Runnable = { () =>
println(s"New incoming connection $idx, closing it in $delay")
Thread.sleep(delay.toMillis)
println(s"Closing incoming connection $idx")
c.close()
println(s"Closed incoming connection $idx")
}
val t = new Thread(runnable)
t.setDaemon(true)
t.start()
}
}
}
19 changes: 19 additions & 0 deletions tests/test/src/libdaemonjvm/tests/LockTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,23 @@ class LockTests extends munit.FunSuite {
}
}

test("locked") {
TestUtil.withTestDir { dir =>
val files = TestUtil.lockFiles(dir)
val e = files.withLock {
TestUtil.tryAcquire(files) { maybeChannel =>
maybeChannel match {
case Left(e: LockError.Locked) =>
case Left(otherError) =>
throw new Exception("Unexpected error type (expected Locked)", otherError)
case Right(channel) =>
sys.error("Opening new server channel should have failed")
}
}
Right(())
}
expect(e.isRight)
}
}

}
49 changes: 30 additions & 19 deletions tests/test/src/libdaemonjvm/tests/TestUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import java.util.concurrent.CountDownLatch
import java.net.Socket
import java.util.concurrent.atomic.AtomicBoolean
import libdaemonjvm.internal.SocketFile
import scala.util.control.NonFatal

object TestUtil {
private lazy val testDirBase = {
Expand Down Expand Up @@ -67,31 +68,41 @@ object TestUtil {
files: LockFiles,
proc: LockProcess
)(f: Either[LockError, Either[ServerSocket, ServerSocketChannel]] => T): T = {
var maybeServerChannel: Either[LockError, Either[ServerSocket, ServerSocketChannel]] = null
var acceptThreadOpt = Option.empty[Thread]
val accepting = new CountDownLatch(1)
val shouldStop = new AtomicBoolean(false)
var serverChannel: Either[ServerSocket, ServerSocketChannel] = null
var acceptThreadOpt = Option.empty[Thread]
val accepting = new CountDownLatch(1)
val shouldStop = new AtomicBoolean(false)
try {
maybeServerChannel = Lock.tryAcquire(files, proc)
if (Properties.isWin)
// Windows named pipes seem no to accept clients unless accept is being called on the server socket
acceptThreadOpt =
maybeServerChannel.toOption.flatMap(_.left.toOption.map(acceptAndDiscard(
_,
accepting,
() => shouldStop.get()
)))
for (t <- acceptThreadOpt) {
t.start()
accepting.await()
Thread.sleep(1000L) // waiting so that the accept call below effectively awaits client... :|
val maybeServerChannel = Lock.tryAcquire(files, proc) {
serverChannel = SocketHandler.server(files.socketPaths)
if (Properties.isWin)
// Windows named pipes seem no to accept clients unless accept is being called on the server socket
acceptThreadOpt =
serverChannel.left.toOption.map(acceptAndDiscard(
_,
accepting,
() => shouldStop.get()
))
for (t <- acceptThreadOpt) {
t.start()
accepting.await()
// waiting so that the accept call below effectively awaits client... :|
Thread.sleep(
1000L
)
}
serverChannel
}
f(maybeServerChannel)
}
finally {
shouldStop.set(true)
SocketFile.canConnect(files.socketPaths) // unblock the server thread last accept
for (e <- Option(maybeServerChannel); channel <- e)
try SocketFile.canConnect(files.socketPaths) // unblock the server thread last accept
catch {
case NonFatal(e) =>
System.err.println(s"Ignoring $e while trying to unblock last accept")
}
for (channel <- Option(serverChannel))
channel.merge.close()
}
}
Expand Down

0 comments on commit bb1789f

Please sign in to comment.