diff --git a/common/src/main/scala/org/specs2/control/eff/Cache.scala b/common/src/main/scala/org/specs2/control/eff/Cache.scala new file mode 100644 index 0000000000..9909cde7e8 --- /dev/null +++ b/common/src/main/scala/org/specs2/control/eff/Cache.scala @@ -0,0 +1,258 @@ +package org.specs2.control.eff + +import java.util +import java.util.concurrent._ + +/** + * This cache is used to memoize values for the Memoized effect + */ +trait Cache { + + type C <: Cache + + /** + * store a value for a given key, subsequent calls to memo will return the same value + */ + def memo[V](key: AnyRef, value: =>V): V + + /** + * put a value for a given key and override the previous value if present + */ + def put[V](key: AnyRef, value: V): V + + /** + * get a value for a given key + */ + def get[V](key: AnyRef): Option[V] + + /** + * remove the given key + */ + def reset(key: AnyRef): C + +} + +/** + * type class for effects which can be cached + * in a SequenceCache + */ +trait SequenceCached[M[_]] { + def apply[X](cache: Cache, key: AnyRef, sequenceKey: Int, tx: =>M[X]): M[X] +} + +case class ConcurrentHashMapCache(map: ConcurrentHashMap[AnyRef, Evaluated[Any]] = new ConcurrentHashMap[AnyRef, Evaluated[Any]]) extends Cache { + + type C = Cache + + def memo[V](key: AnyRef, value: =>V): V = { + lazy val v = value + if (map.putIfAbsent(key, Memoized(v)) == null) v + else map.get(key).value.asInstanceOf[V] + } + + def put[V](key: AnyRef, value: V): V = { + val v = Now(value) + map.put(key, v) + Option(map.get(key)).getOrElse(v).value.asInstanceOf[V] + } + + def get[V](key: AnyRef): Option[V] = + Option(map.get(key)).map(_.value.asInstanceOf[V]) + + def reset(key: AnyRef) = { + map.remove(key) + this + } +} + +case class ConcurrentWeakIdentityHashMapCache( + map: ConcurrentWeakIdentityHashMap[AnyRef, Evaluated[Any]] = new ConcurrentWeakIdentityHashMap[AnyRef, Evaluated[Any]]) extends Cache { + + type C = Cache + + def memo[V](key: AnyRef, value: =>V): V = { + lazy val v = value + if (map.putIfAbsent(key.asInstanceOf[AnyRef], Memoized(v)) == null) v + else map.get(key.asInstanceOf[AnyRef]).value.asInstanceOf[V] + } + + def put[V](key: AnyRef, value: V): V = + map.put(key, Now(value)).value.asInstanceOf[V] + + def get[V](key: AnyRef): Option[V] = + Option(map.get(key)).map(_.value.asInstanceOf[V]) + + def reset(key: AnyRef) = { + map.remove(key.hashCode) + this + } + +} + +///// + +/* + * Copyright Terracotta, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License") + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.lang.ref.ReferenceQueue +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentMap + +/** + * @author Alex Snaps + */ +class ConcurrentWeakIdentityHashMap[K, V] extends ConcurrentMap[K, V] { + + private val map = new ConcurrentHashMap[WeakReference[K], V] + private val queue = new ReferenceQueue[K] + + override def putIfAbsent(key: K, value: V): V = { + purgeKeys + map.putIfAbsent(newKey(key), value) + } + + def get(key: Object): V = { + purgeKeys + map.get(new WeakReference[Object](key, null)) + } + + def clear(): Unit = { + purgeKeys + map.clear + } + + def containsKey(key: Any): Boolean = { + purgeKeys + map.containsKey(new WeakReference[K](key.asInstanceOf[K], null)) + } + + def containsValue(value: Object): Boolean = { + purgeKeys + map.containsValue(value) + } + + def isEmpty: Boolean = { + purgeKeys + map.isEmpty + } + + def remove(key: Any): V = { + purgeKeys + map.remove(new WeakReference[K](key.asInstanceOf[K], null)) + } + + def size: Int = { + purgeKeys + map.size + } + + def put(key: K, value: V): V = { + purgeKeys + map.put(newKey(key), value) + } + + def keySet(): java.util.Set[K] = { + new util.AbstractSet[K] { + def iterator: java.util.Iterator[K] = { + purgeKeys + new WeakSafeIterator[K, WeakReference[K]](map.keySet.iterator) { + def extract(u: WeakReference[K]): K = u.get + } + } + + override def contains(o: Object): Boolean = ConcurrentWeakIdentityHashMap.this.containsKey(o) + def size = map.size + } + + } + + def entrySet(): java.util.Set[java.util.Map.Entry[K,V]] = new util.AbstractSet[java.util.Map.Entry[K,V]] { + def iterator: java.util.Iterator[java.util.Map.Entry[K,V]] = { + purgeKeys + new WeakSafeIterator[java.util.Map.Entry[K,V], java.util.Map.Entry[WeakReference[K], V]](map.entrySet.iterator) { + def extract(u: java.util.Map.Entry[WeakReference[K], V]): java.util.Map.Entry[K,V] = { + val key = u.getKey.get + if (key == null) null + else new java.util.AbstractMap.SimpleEntry(key, u.getValue) + } + } + } + def size = map.size + } + + def putAll(m: java.util.Map[_ <: K, _ <: V]): Unit = { + purgeKeys + import scala.collection.JavaConverters._ + m.entrySet.asScala.foreach(e => map.put(newKey(e.getKey), e.getValue)) + } + + def values: java.util.Collection[V] = { + purgeKeys + map.values + } + + + private def purgeKeys(): Unit = { + var reference = queue.poll + while (reference != null) { + reference = queue.poll + map.remove(reference) + } + } + + private def newKey(key: K): WeakReference[K] = { + new WeakReference[K](key, queue) + } + + private class WeakReference[T](referent: T, queue: ReferenceQueue[T]) extends java.lang.ref.WeakReference[T](referent, queue) { + + override def hashCode: Int = System.identityHashCode(referent) + + override def equals(a: Any): Boolean = { + a != null && a.getClass == this.getClass && + (this == a || this.get == a.asInstanceOf[WeakReference[T]].get) + } + + } + + private abstract class WeakSafeIterator[T, U](weakIterator: java.util.Iterator[U]) extends java.util.Iterator[T] { + advance + private var strongNext: T = null.asInstanceOf[T] + + def advance(): Unit = { + while (weakIterator.hasNext) { + val nextU = weakIterator.next + strongNext = extract(nextU) + if (strongNext != null) return + } + strongNext = null.asInstanceOf[T] + } + + def hasNext: Boolean = strongNext != null + + def next: T = { + val next = strongNext + advance + next + } + + override def remove = throw new UnsupportedOperationException() + + def extract(u: U): T + } + +} + diff --git a/common/src/main/scala/org/specs2/control/eff/Eff.scala b/common/src/main/scala/org/specs2/control/eff/Eff.scala index fcecbd27bb..f98ba6932f 100644 --- a/common/src/main/scala/org/specs2/control/eff/Eff.scala +++ b/common/src/main/scala/org/specs2/control/eff/Eff.scala @@ -242,6 +242,12 @@ trait EffCreation { def sequenceA[R, F[_] : Traverse, A](fs: F[Eff[R, A]]): Eff[R, F[A]] = Traverse[F].sequence(fs)(EffImplicits.EffApplicative[R]) + /** use the applicative instance of Eff to traverse a list of values, then flatten it */ + def flatTraverseA[R, F[_], A, B](fs: F[A])(f: A => Eff[R, F[B]])(implicit FT: Traverse[F], FM: Bind[F]): Eff[R, F[B]] = { + val applicative = EffImplicits.EffApplicative[R] + applicative.map(FT.traverse(fs)(f)(applicative))(FM.join) + } + } object EffCreation extends EffCreation @@ -350,9 +356,9 @@ case class Arrs[R, A, B](functions: Vector[Any => Eff[R, Any]]) extends (A => Ef Arrs(functions :+ f.asInstanceOf[Any => Eff[R, Any]]) /** map the last returned effect */ - def mapLast(f: Eff[R, B] => Eff[R, B]): Arrs[R, A, B] = + def mapLast[C](f: Eff[R, B] => Eff[R, C]): Arrs[R, A, C] = functions match { - case Vector() => this + case v if v.isEmpty => Arrs[R, A, C](v :+ ((a: Any) => f(Eff.pure(a.asInstanceOf[B])).asInstanceOf[Eff[R, Any]])) case fs :+ last => Arrs(fs :+ ((x: Any) => f(last(x).asInstanceOf[Eff[R, B]]).asInstanceOf[Eff[R, Any]])) } diff --git a/common/src/main/scala/org/specs2/control/eff/Evaluated.scala b/common/src/main/scala/org/specs2/control/eff/Evaluated.scala new file mode 100644 index 0000000000..1e6c50b2a8 --- /dev/null +++ b/common/src/main/scala/org/specs2/control/eff/Evaluated.scala @@ -0,0 +1,21 @@ +package org.specs2.control.eff + +trait Evaluated[+T] { + def value: T +} + +case class Memoized[T](t: () => T) extends Evaluated[T] { + lazy val result: T = t() + def value: T = result +} + +object Memoized { + def apply[T](t: =>T): Evaluated[T] = + Memoized(() => t) +} + +object Now { + def apply[T](t: T): Evaluated[T] = + Memoized(() => t) +} + diff --git a/common/src/main/scala/org/specs2/control/eff/ExecutorServices.scala b/common/src/main/scala/org/specs2/control/eff/ExecutorServices.scala new file mode 100644 index 0000000000..7655b64c38 --- /dev/null +++ b/common/src/main/scala/org/specs2/control/eff/ExecutorServices.scala @@ -0,0 +1,98 @@ +package org.specs2.control.eff + +import java.util.Collections +import java.util.concurrent._ + +import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService} + +case class ExecutorServices(executorServiceEval: Evaluated[ExecutorService], + scheduledExecutorEval: Evaluated[ScheduledExecutorService], + executionContextEval: Evaluated[ExecutionContext]) { + + /** note: shutdown only shuts down the executor services */ + def shutdown: Evaluated[Unit] = Memoized { + // careful: calling executorService.shutdown or scheduledExecutorService will deadlock! + try executorServiceEval.value.shutdown + finally scheduledExecutorEval.value.shutdown + } + + implicit lazy val executorService: ExecutorService = + executorServiceEval.value + + implicit lazy val scheduledExecutorService: ScheduledExecutorService = + scheduledExecutorEval.value + + implicit lazy val executionContext: ExecutionContext = + executionContextEval.value + + /** convenience method to shutdown the services when the final future has completed */ + def shutdownOnComplete[A](future: scala.concurrent.Future[A]): ExecutorServices = { + future.onComplete(_ => shutdown.value) + this + } + +} + +object ExecutorServices { + + lazy val threadsNb = Runtime.getRuntime.availableProcessors + + def create(implicit es: ExecutorService, s: ScheduledExecutorService): ExecutorServices = + fromExecutorServices(es, s) + + def fromExecutorServices(es: =>ExecutorService, s: =>ScheduledExecutorService): ExecutorServices = + ExecutorServices( + Memoized(es), + Memoized(s), + Memoized(createExecutionContext(es)) + ) + + def fromExecutorService(es: =>ExecutorService): ExecutorServices = + fromExecutorServices(es, scheduledExecutor(threadsNb)) + + def createExecutionContext(executorService: ExecutorService, logger: String => Unit = println): ExecutionContext = + ExecutionContext.fromExecutorService(executorService, (t: Throwable) => logger(t.getStackTrace.mkString("\n"))) + + def executor(threadsNb: Int): ExecutorService = + Executors.newFixedThreadPool(threadsNb) + + def scheduledExecutor(scheduledThreadsNb: Int): ScheduledExecutorService = + Executors.newScheduledThreadPool(scheduledThreadsNb) + + + /** + * create an ExecutionEnv from an execution context only + * + * WARNING!!! This method create a brand new scheduledExecutorService which will be used if + * you use the ExecutorServices to timeout an Async effect + */ + def fromExecutionContext(ec: =>ExecutionContext): ExecutorServices = + ExecutorServices( + Memoized(executorFromExecutionContext(ec)), + Memoized(scheduledExecutor(threadsNb)), + Memoized(ec)) + + /** taken from https://gist.github.com/viktorklang/5245161 */ + def executorFromExecutionContext(ec: =>ExecutionContext): ExecutorService = + ec match { + case null => throw null + case eces: ExecutionContextExecutorService => eces + case other => new AbstractExecutorService with ExecutionContextExecutorService { + override def prepare(): ExecutionContext = other + override def isShutdown = false + override def isTerminated = false + override def shutdown() = () + override def shutdownNow() = Collections.emptyList[Runnable] + override def execute(runnable: Runnable): Unit = other execute runnable + override def reportFailure(t: Throwable): Unit = other reportFailure t + override def awaitTermination(length: Long, unit: TimeUnit): Boolean = false + } + } + + /** create an ExecutionEnv from Scala global execution context */ + def fromGlobalExecutionContext: ExecutorServices = + fromExecutionContext(scala.concurrent.ExecutionContext.global) + +} + + diff --git a/common/src/main/scala/org/specs2/control/eff/FutureEffect.scala b/common/src/main/scala/org/specs2/control/eff/FutureEffect.scala new file mode 100644 index 0000000000..1188e8b951 --- /dev/null +++ b/common/src/main/scala/org/specs2/control/eff/FutureEffect.scala @@ -0,0 +1,156 @@ +package org.specs2.control.eff + +import java.util.concurrent.ScheduledExecutorService + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException} +import scala.util.{Failure, Success} +import scalaz._, Scalaz._ +import Eff._ + +object FutureCreation extends FutureCreation + +final case class TimedFuture[A](callback: (ScheduledExecutorService, ExecutionContext) => Future[A], timeout: Option[FiniteDuration] = None) { + @inline def runNow(sexs: ScheduledExecutorService, exc: ExecutionContext): Future[A] = { + timeout.fold { + callback(sexs, exc) + } { t => + val promise = Promise[A] + val timeout = new Runnable { + override def run(): Unit = { + val _ = promise.tryFailure(new TimeoutException) + } + } + sexs.schedule(timeout, t.length, t.unit) + promise.tryCompleteWith(callback(sexs, exc)) + promise.future + } + } +} + +object TimedFuture { + + final def ApplicativeTimedFuture: Applicative[TimedFuture] = new Applicative[TimedFuture] { + def point[A](x: =>A) = TimedFuture((_, _) => Future.successful(x)) + + def ap[A, B](fa: =>TimedFuture[A])(ff: =>TimedFuture[(A) => B]): TimedFuture[B] = { + val newCallback = { (sexs: ScheduledExecutorService, ec: ExecutionContext) => + val ffRan = ff.runNow(sexs, ec) + val faRan = fa.runNow(sexs, ec) + faRan.flatMap(a => ffRan.map(f => f(a))(ec))(ec) + } + TimedFuture(newCallback) + } + override def toString = "Applicative[TimedFuture]" + } + + implicit final def MonadTimedFuture: Monad[TimedFuture] with BindRecʹ[TimedFuture] = new Monad[TimedFuture] with BindRecʹ[TimedFuture] { + def point[A](x: => A) = TimedFuture((_, _) => Future.successful(x)) + + def bind[A, B](fa: TimedFuture[A])(f: A => TimedFuture[B]): TimedFuture[B] = + TimedFuture[B]((sexs, ec) => fa.runNow(sexs, ec).flatMap(f(_).runNow(sexs, ec))(ec)) + + def tailrecM[A, B](a: A)(f: (A) => TimedFuture[\/[A, B]]): TimedFuture[B] = + TimedFuture[B]({ (sexs, ec) => + def loop(va: A): Future[B] = f(va).runNow(sexs, ec).flatMap { + case -\/(na) => loop(na) + case \/-(nb) => Future.successful(nb) + }(ec) + loop(a) + }) + + override def toString = "Monad[TimedFuture]" + } + + implicit final def BindRecTimedFuture: BindRecʹ[TimedFuture] = + MonadTimedFuture +} + +trait FutureTypes { + type _future[R] = TimedFuture |= R + type _Future[R] = TimedFuture <= R +} + +trait FutureCreation extends FutureTypes { + + final def fromFutureWithExecutors[R :_future, A](c: (ScheduledExecutorService, ExecutionContext) => Future[A], timeout: Option[FiniteDuration] = None): Eff[R, A] = + send[TimedFuture, R, A](TimedFuture(c, timeout)) + + final def fromFuture[R :_future, A](c: => Future[A], timeout: Option[FiniteDuration] = None): Eff[R, A] = + send[TimedFuture, R, A](TimedFuture((_, _) => c, timeout)) + + final def futureFail[R :_future, A](t: Throwable): Eff[R, A] = + send[TimedFuture, R, A](TimedFuture((_, _) => Future.failed(t))) + + final def futureFromEither[R :_future, A](e: Throwable Either A): Eff[R, A] = + e.fold(futureFail[R, A], Eff.pure[R, A]) + + final def futureDelay[R :_future, A](a: => A, timeout: Option[FiniteDuration] = None): Eff[R, A] = + send[TimedFuture, R, A](TimedFuture((_, ec) => Future(a)(ec), timeout)) + + final def futureFork[R :_future, A](a: => A, ec: ExecutionContext, timeout: Option[FiniteDuration] = None): Eff[R, A] = + send[TimedFuture, R, A](TimedFuture((_, _) => Future(a)(ec), timeout)) + + final def futureDefer[R :_future, A](a: => Future[A], timeout: Option[FiniteDuration] = None): Eff[R, A] = + send[TimedFuture, R, A](TimedFuture((_, _) => a, timeout)) + +} + +trait FutureInterpretation extends FutureTypes { + + def runAsync[R, A](e: Eff[R, A])(implicit sexs: ScheduledExecutorService, exc: ExecutionContext, m: Member.Aux[TimedFuture, R, NoFx]): Future[A] = + Eff.detachA(Eff.effInto[R, Fx1[TimedFuture], A](e))(TimedFuture.MonadTimedFuture, TimedFuture.BindRecTimedFuture, TimedFuture.ApplicativeTimedFuture).runNow(sexs, exc) + + def runSequential[R, A](e: Eff[R, A])(implicit sexs: ScheduledExecutorService, exc: ExecutionContext, m: Member.Aux[TimedFuture, R, NoFx]): Future[A] = + Eff.detach(Eff.effInto[R, Fx1[TimedFuture], A](e)).runNow(sexs, exc) + + import interpret.of + + final def futureAttempt[R, A](e: Eff[R, A])(implicit future: TimedFuture /= R): Eff[R, Throwable Either A] = + interpret.interceptNatM[R, TimedFuture, Throwable Either ?, A](e, + new (TimedFuture ~> (TimedFuture of (Throwable Either ?))#l) { + override def apply[X](fa: TimedFuture[X]): TimedFuture[Throwable Either X] = attempt(fa) + }) + + final def attempt[A](a: TimedFuture[A]): TimedFuture[Throwable Either A] = { + TimedFuture[Throwable Either A](callback = (sexs, ec) => { + val prom = Promise[Throwable Either A]() + a.runNow(sexs, ec).onComplete { t => + prom.success(t match { + case Failure(ex) => Left(ex) + case Success(v) => Right(v) + }) + }(ec) + prom.future + }) + } + + final def memoize[A](key: AnyRef, cache: Cache, future: TimedFuture[A]): TimedFuture[A] = + TimedFuture { (sexs, ec) => + val prom = Promise[A]() + cache.get[A](key).fold { + prom.completeWith(future.runNow(sexs, ec).map { v => val _ = cache.put(key, v); v }(ec)) + } { v => prom.success(v) } + prom.future + } + + /** + * Memoize future values using a cache + * + * if this method is called with the same key the previous value will be returned + */ + final def futureMemo[R, A](key: AnyRef, cache: Cache, e: Eff[R, A])(implicit future: TimedFuture /= R): Eff[R, A] = + interpret.interceptNat[R, TimedFuture, A](e)( + new (TimedFuture ~> TimedFuture) { + override def apply[X](fa: TimedFuture[X]): TimedFuture[X] = memoize(key, cache, fa) + } + ) + +} + +object FutureInterpretation extends FutureInterpretation + +trait FutureEffect extends FutureCreation with FutureInterpretation + +object FutureEffect extends FutureEffect + diff --git a/common/src/main/scala/org/specs2/control/eff/Interpret.scala b/common/src/main/scala/org/specs2/control/eff/Interpret.scala index 56d8dfdf66..c9e6e22695 100644 --- a/common/src/main/scala/org/specs2/control/eff/Interpret.scala +++ b/common/src/main/scala/org/specs2/control/eff/Interpret.scala @@ -568,6 +568,56 @@ trait Interpret { } } + type of[F[_], G[_]] = {type l[A] = F[G[A]]} + + /** + * Intercept the values for one effect, + * emitting new values for the same effect inside a monad which is interleaved in + */ + def interceptNatM[R, T[_], F[_], A](effects: Eff[R, A], nat: T ~> (T `of` F)#l) + (implicit m: MemberInOut[T, R], FT: Traverse[F], FM: Monad[F]): Eff[R, F[A]] = { + effects match { + case Pure(a, last) => + pure[R, F[A]](FM.point(a)).addLast(last) + + case Impure(u, c, last) => + m.extract(u) match { + case Some(tx) => + val union = m.inject(nat(tx)) + + Impure(union, Arrs.singleton({ ex: F[u.X] => + Eff.flatTraverseA(ex)(x => interceptNatM[R, T, F, A](c(x), nat)) + }), last) + + case None => Impure(u, c.mapLast(r => interceptNatM[R, T, F, A](r, nat)), last) + } + + case ImpureAp(unions, continuation, last) => + def materialize(u: Union[R, Any]): Union[R, Any] = + m.extract(u) match { + case Some(tx) => m.inject(nat(tx).asInstanceOf[T[Any]]) + case None => u + } + + val materializedUnions = + Unions(materialize(unions.first), unions.rest.map(materialize)) + + val collected = unions.extract(m) + val continuation1 = Arrs.singleton[R, List[Any], F[A]]({ ls: List[Any] => + val xors = + ls.zipWithIndex.collect { case (a, i) => + if (collected.indices.contains(i)) a.asInstanceOf[F[Any]] + else FM.pure(a) + }.sequence + + Eff.flatTraverseA(xors)(x => interceptNatM[R, T, F, A](continuation(x), nat)) + + }) + + ImpureAp(materializedUnions, continuation1, last) + } + } + /** interpret an effect by running side-effects */ def interpretUnsafe[R, U, T[_], A](effects: Eff[R, A]) (sideEffect: SideEffect[T]) diff --git a/common/src/main/scala/org/specs2/control/eff/syntax/all.scala b/common/src/main/scala/org/specs2/control/eff/syntax/all.scala index 37a4b8c163..5859370653 100644 --- a/common/src/main/scala/org/specs2/control/eff/syntax/all.scala +++ b/common/src/main/scala/org/specs2/control/eff/syntax/all.scala @@ -12,5 +12,6 @@ trait all extends console with warnings with async with + future with eff diff --git a/common/src/main/scala/org/specs2/control/eff/syntax/future.scala b/common/src/main/scala/org/specs2/control/eff/syntax/future.scala new file mode 100644 index 0000000000..21971a84f8 --- /dev/null +++ b/common/src/main/scala/org/specs2/control/eff/syntax/future.scala @@ -0,0 +1,31 @@ +package org.specs2.control.eff +package syntax + +import java.util.concurrent.ScheduledExecutorService + +import scala.concurrent.{ExecutionContext, Future} + +object future extends future + +trait future { + + implicit final def toFutureOps[R, A](e: Eff[R, A]): FutureOps[R, A] = new FutureOps[R, A](e) + +} + +final class FutureOps[R, A](val e: Eff[R, A]) extends AnyVal { + def futureAttempt(implicit future: TimedFuture /= R): Eff[R, Throwable Either A] = + FutureInterpretation.futureAttempt(e) + + def futureMemo(key: AnyRef, cache: Cache)(implicit future: TimedFuture /= R): Eff[R, A] = + FutureInterpretation.futureMemo(key, cache, e) + + def runAsync(implicit sexs: ScheduledExecutorService, exc: ExecutionContext, m: Member.Aux[TimedFuture, R, NoFx]): Future[A] = + FutureInterpretation.runAsync(e) + + def runSequential(implicit sexs: ScheduledExecutorService, exc: ExecutionContext, m: Member.Aux[TimedFuture, R, NoFx]): Future[A] = + FutureInterpretation.runSequential(e) +} + + + diff --git a/common/src/main/scala/org/specs2/control/package.scala b/common/src/main/scala/org/specs2/control/package.scala index 7bac862bf4..bdc3500dcc 100644 --- a/common/src/main/scala/org/specs2/control/package.scala +++ b/common/src/main/scala/org/specs2/control/package.scala @@ -16,7 +16,7 @@ import WarningsEffect._ import org.specs2.control.producer._ import scala.concurrent._ -import scala.concurrent.ExecutionContext.Implicits.global +import FutureEffect._ import scala.util.control.NonFatal package object control { @@ -28,8 +28,8 @@ package object control { lazy val noLogging = (s: String) => () lazy val consoleLogging = (s: String) => println(s) - type StreamStack = Fx.fx2[Async, Safe] - type ActionStack = Fx.fx5[Async, ErrorOrOk, Console, Warnings, Safe] + type StreamStack = Fx.fx2[TimedFuture, Safe] + type ActionStack = Fx.fx5[TimedFuture, ErrorOrOk, Console, Warnings, Safe] type OperationStack = Fx.fx4[ErrorOrOk, Console, Warnings, Safe] type Action[A] = Eff[ActionStack, A] @@ -41,15 +41,14 @@ package object control { type AsyncFold[A, B] = origami.Fold[ActionStack, A, B] type AsyncSink[A] = origami.Fold[ActionStack, A, Unit] - - val asyncInterpreter = AsyncFutureInterpreter.create(global) - import asyncInterpreter._ + lazy val executorServices = ExecutorServices.fromGlobalExecutionContext + import executorServices._ def emitAsync[A](as: A*): AsyncStream[A] = producer.producers.emitSeq(as) def emitAsyncDelayed[A](a: A): AsyncStream[A] = - producer.producers.eval(asyncDelay(a)) + producer.producers.eval(futureDelay(a)) /** * warn the user about something that is probably wrong on his side, @@ -60,10 +59,10 @@ package object control { fail(failureMessage) def executeAction[A](action: Action[A], printer: String => Unit = s => ()): (Error \/ A, List[String]) = { - type S = Fx.append[Fx.fx2[Async, ErrorOrOk], Fx.fx2[Console, Warnings]] + type S = Fx.append[Fx.fx2[TimedFuture, ErrorOrOk], Fx.fx2[Console, Warnings]] Await.result(action.execSafe.flatMap(_.fold(t => exception[S, A](t), a => Eff.pure[S, A](a))). - runError.runConsoleToPrinter(printer).runWarnings.into[Fx1[Async]].runAsyncFuture, Duration.Inf) + runError.runConsoleToPrinter(printer).runWarnings.into[Fx1[TimedFuture]].runAsync, Duration.Inf) } def runAction[A](action: Action[A], printer: String => Unit = s => ()): Error \/ A = @@ -91,7 +90,7 @@ package object control { } def attemptExecuteAction[A](action: Action[A], printer: String => Unit = s => ()): Throwable \/ (Error \/ A, List[String]) = - try Await.result(action.runError.runConsoleToPrinter(printer).runWarnings.execSafe.runAsyncFuture, Duration.Inf) + try Await.result(action.runError.runConsoleToPrinter(printer).runWarnings.execSafe.runAsync, Duration.Inf) catch { case NonFatal(t) => -\/(t) } def attemptExecuteOperation[A](operation: Operation[A], printer: String => Unit = s => ()): Throwable \/ (Error \/ A, List[String]) = @@ -202,10 +201,10 @@ package object control { SafeEffect.protect(a) def asyncDelayAction[A](a: =>A): Action[A] = - AsyncEffect.asyncDelay[ActionStack, A](a) + futureDelay[ActionStack, A](a) - def asyncForkAction[A](a: =>A, timeout: Option[FiniteDuration] = None): Action[A] = - AsyncEffect.asyncFork[ActionStack, A](a, timeout) + def asyncForkAction[A](a: =>A, ec: ExecutionContext, timeout: Option[FiniteDuration] = None): Action[A] = + futureFork[ActionStack, A](a, ec, timeout) def delayed[A](a: =>A): Action[A] = ErrorEffect.ok(a) diff --git a/core/src/main/scala/org/specs2/specification/process/Executor.scala b/core/src/main/scala/org/specs2/specification/process/Executor.scala index 62b77d5610..e4bec0c7ca 100644 --- a/core/src/main/scala/org/specs2/specification/process/Executor.scala +++ b/core/src/main/scala/org/specs2/specification/process/Executor.scala @@ -84,10 +84,10 @@ trait DefaultExecutor extends Executor { def executeOneFragment(f: Fragment, timeout: Option[FiniteDuration] = None): Action[Fragment] = { if (arguments.sequential) asyncDelayAction(executeFragment(env)(f)) - else asyncForkAction(executeFragment(env)(f), timeout).asyncAttempt.map { - case -\/(t: TimeoutException) => executeFragment(env)(f.setExecution(Execution.result(Skipped("timeout"+timeout.map(" after "+_).getOrElse(""))))) - case -\/(t) => executeFragment(env)(f.setExecution(Execution.result(Error(t)))) - case \/-(f1) => f1 + else asyncForkAction(executeFragment(env)(f), env.executionContext, timeout).futureAttempt.map { + case Left(t: TimeoutException) => executeFragment(env)(f.setExecution(Execution.result(Skipped("timeout"+timeout.map(" after "+_).getOrElse(""))))) + case Left(t) => executeFragment(env)(f.setExecution(Execution.result(Error(t)))) + case Right(f1) => f1 } } diff --git a/core/src/test/scala/org/specs2/specification/ExecutorSpec.scala b/core/src/test/scala/org/specs2/specification/ExecutorSpec.scala index 5f366ba596..ee0e6b7b26 100644 --- a/core/src/test/scala/org/specs2/specification/ExecutorSpec.scala +++ b/core/src/test/scala/org/specs2/specification/ExecutorSpec.scala @@ -13,7 +13,8 @@ import specification.core._ import specification.process.DefaultExecutor import control.producer._ import ResultMatchers._ -import scala.concurrent.ExecutionContext + +import scala.concurrent._ class ExecutorSpec(implicit ec: ExecutionContext) extends script.Specification with Groups with ThrownExpectations { def is = section("travis") ^ s2""" @@ -166,8 +167,8 @@ class ExecutorSpec(implicit ec: ExecutionContext) extends script.Specification w def userEnv = { env: Env => val fragments = Fragments.foreach(1 to 10) { i: Int => - "test " + i ! Execution.withExecutionEnv { implicit ec: ExecutionEnv => - scala.concurrent.Future(1) must be_==(1).await + "test " + i ! Execution.withExecutionEnv { implicit ee: ExecutionEnv => + Await.result(scala.concurrent.Future(1), 1 second) ==== 1 } } execute(fragments.fragments, env) must contain(beSuccessful[Result]).forall