Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Status of project and performance improvements #679

Open
x42005e1f opened this issue Oct 26, 2024 · 15 comments · May be fixed by #691
Open

Status of project and performance improvements #679

x42005e1f opened this issue Oct 26, 2024 · 15 comments · May be fixed by #691

Comments

@x42005e1f
Copy link

As I can see from the commits, there hasn't been a single major change since December 2021, with the maximum changes being config fixes, cleanup of types, and dropping support for Python 3.7. The project's issues are not resolved, and pull requests are still open. So the question is: what is the level of support for this project now?

There's a reason why I'm asking this. I too have implemented queues (as part of a more general package) that support communicating between synchronous and asynchronous code, but with a different approach. In simple put-get tests between two threads in both directions, they appear to be ~7x faster on CPython, ~24x faster on PyPy, which mitigates the performance issue. They are not bound to the event loop. They also support various concurrency libraries, and a lot more.

janus benchmark
import asyncio

from threading import Thread

import janus


def work(in_q, out_q):
    while True:
        item = in_q.get()

        if item is None:
            break

        out_q.put(item)


async def func(in_q, out_q):
    ops = 0

    try:
        item = 42

        while True:
            await out_q.put(item)

            ops += 1

            item = await in_q.get()
    finally:
        print(ops)


async def main():
    in_q = janus.Queue()
    out_q = janus.Queue()

    Thread(target=work, args=[out_q.sync_q, in_q.sync_q]).start()

    try:
        await asyncio.wait_for(func(in_q.async_q, out_q.async_q), 6)
    except asyncio.TimeoutError:
        pass
    finally:
        await out_q.async_q.put(None)


asyncio.run(main())
aiologic benchmark
import asyncio

from threading import Thread

import aiologic


def work(in_q, out_q):
    while True:
        item = in_q.green_get()

        if item is None:
            break

        out_q.green_put(item)


async def func(in_q, out_q):
    ops = 0

    try:
        item = 42

        while True:
            await out_q.async_put(item)

            ops += 1

            item = await in_q.async_get()
    finally:
        print(ops)


async def main():
    in_q = aiologic.SimpleQueue()
    out_q = aiologic.SimpleQueue()

    Thread(target=work, args=[out_q, in_q]).start()

    try:
        await asyncio.wait_for(func(in_q, out_q), 6)
    except asyncio.TimeoutError:
        pass
    finally:
        await out_q.async_put(None)


asyncio.run(main())

I need to know what the status of your package is to determine how best to contrast mine with yours.

@asvetlov
Copy link
Member

Hey!
Thanks for sharing a link to aiologic, it looks interesting!
Regarding janus status, the project is not actively developed but still maintained. The most part of work is about keeping it work with newest Python versions.
API is stable, I don't see a lot of space for improvements here. The api mimics queue and asyncio counterparts which are not changed for decade.

Requests for better performance and unbound event loop are reasonable, a pull request is welcome.
anyio support is more questionable. janus provides a bridge between queue.Queue compatible and asyncio.Queue compatible APIs. From my understanding, anyio.Queue doesn't exist. Maybe adding such thing makes sense, I don't know. I personally never use anyio for my job but work with asyncio only. Maybe keeping a narrow scope is not a bad thing for the library.

Anyway, thank you for your questions!

@asvetlov
Copy link
Member

Please correct me if I'm wrong.
aiologic benchmark uses gevent/evenlet in 'patched' mode.
Thus, is measures not asyncio/threaded queue but asyncio/greenlet. It makes a huge difference; threaded and greenlet execution models are not interchangeable.

@x42005e1f
Copy link
Author

Thanks for your answers. No, green_* methods of aiologic work with threads until the world is patched by gevent or eventlet or the library used is explicitly specified (via aiologic.lowlevel.current_green_library_tlocal). If it didn't work that way, aiologic would have required dependencies. But it doesn't have them, so the benchmark only measures asyncio/threaded queue.

You might also be interested in looking at a benchmark that uses event loop methods with asyncio.Queue.

asyncio benchmark
import asyncio

from threading import Thread
from concurrent.futures import CancelledError


def work(loop, in_q, out_q):
    try:
        while True:
            item = asyncio.run_coroutine_threadsafe(in_q.get(), loop).result()

            if item is None:
                break

            loop.call_soon_threadsafe(out_q.put_nowait, item)
    except (CancelledError, RuntimeError):  # event loop is closed
        pass


async def func(in_q, out_q):
    ops = 0

    try:
        item = 42

        while True:
            await out_q.put(item)

            ops += 1

            item = await in_q.get()
    finally:
        print(ops)


async def main():
    in_q = asyncio.Queue()
    out_q = asyncio.Queue()

    Thread(target=work, args=[asyncio.get_running_loop(), out_q, in_q]).start()

    try:
        await asyncio.wait_for(func(in_q, out_q), 6)
    except asyncio.TimeoutError:
        pass
    finally:
        await out_q.put(None)


asyncio.run(main())

In terms of operations per second, janus < asyncio < aiologic (yes, my solution is even faster than the naive one). This also suggests that the janus performance problem is serious, as it loses to a naive solution by more than 4x.

@asvetlov
Copy link
Member

Thanks, your measures are very interesting.
Did you dig deeper? Have you ideas what are the main slowdown sources?

@x42005e1f
Copy link
Author

I haven't profiled your queues, but by a simple brute force search of possible variants I found that the bottleneck is the sync -> async case, probably due to over-sync in the get() method. Your implementation is not attractive as it is: you're using as many as 5 Condition instances, each of which uses a deque under the hood. asyncio.Lock and asyncio.Event use deques too. On Python 3.13, sys.getsizeof(deque()) == 760, so janus.Queue alone requires at least 6 kilobytes of RAM. Using different deques for notification also degrades fairness: resource starvation is possible with your queues.

I don't think there's a silver bullet here. Optimization is likely to require significant changes to the internal architecture. Speaking of which, those benchmarks have one drawback: switching costs skew the results. If we measure the sync -> async case in a single thread, the situation is even worse: the janus solution is ~12x slower than the naive solution and ~17x slower than the aiologic solution (~31x and ~44x on PyPy). Maybe you shouldn't worry about performance: performance is a feature of my package, and yours is API-level compatibility. The approaches are different, but if you need my input, I'd be happy to help.

janus benchmark
import asyncio

import janus


async def func(in_q, out_q):
    ops = 0

    try:
        item = 42
        loop = asyncio.get_running_loop()

        while True:
            loop.call_soon(out_q.put, item)

            ops += 1

            item = await in_q.get()
    finally:
        print(ops)


async def main():
    queue = janus.Queue()

    try:
        await asyncio.wait_for(func(queue.async_q, queue.sync_q), 6)
    except asyncio.TimeoutError:
        pass


asyncio.run(main())
asyncio benchmark
import asyncio


async def func(in_q, out_q):
    ops = 0

    try:
        item = 42
        loop = asyncio.get_running_loop()

        while True:
            loop.call_soon_threadsafe(out_q.put_nowait, item)

            ops += 1

            item = await in_q.get()
    finally:
        print(ops)


async def main():
    queue = asyncio.Queue()

    try:
        await asyncio.wait_for(func(queue, queue), 6)
    except asyncio.TimeoutError:
        pass


asyncio.run(main())
aiologic benchmark
import asyncio

import aiologic


async def func(in_q, out_q):
    ops = 0

    try:
        item = 42
        loop = asyncio.get_running_loop()

        while True:
            loop.call_soon(out_q.green_put, item)

            ops += 1

            item = await in_q.async_get()
    finally:
        print(ops)


async def main():
    queue = aiologic.SimpleQueue()

    try:
        await asyncio.wait_for(func(queue, queue), 6)
    except asyncio.TimeoutError:
        pass


asyncio.run(main())

Speaking of the janus way, I agree that it doesn't have to support third-party libraries. Projects that are part of aio-libs natively only work with asyncio. What about late loop binding, it's not common in the asyncio world, but all asyncio primitives since Python 3.10 have it, so it makes sense to add it for compatibility with asyncio.Queue. Completely avoiding binding is unlikely to make sense in the case of janus, since using more than one event loop is rare in the asyncio world.

But what confuses me is that answering questions in this repository can take months. This is strange for a maintained project with few issues: I wouldn't ask about the status of project if I didn't see some issues being answered after seven months, and not even by a contributor. It would be nice if there was clarity in this regard, as otherwise it's not clear where the problem lies. And yes, thank you for responding so quickly.

@x42005e1f
Copy link
Author

x42005e1f commented Oct 30, 2024

Here's an idea: we can try to implement your queues on top of my primitives. That way all the complex logic would stay in my package and your implementation would be simplified. As a side effect, your queues will start supporting anyio, different event loops at the same time, and so on. I can test what kind of performance this will give and let you know if you want. But since the development status of my package is alpha, it may not be a good option for you.

My primitives are not API compatible for the reason that otherwise the implementation would get complicated and some things would just get ugly. For example, my queues (aiologic.Queue, aiologic.LifoQueue and aiologic.PriorityQueue) don't support task tracking, because otherwise an all-inclusive implementation would require either O(n) for memory (which is a leak if task_done() isn't called by anyone) or creating an asynchronous version of task_done() with memory allocated for additional deques. So instead, my package has CountdownEvent that allows users to implement join semantics wherever they want.

@x42005e1f x42005e1f changed the title Status of project? Status of project and performance improvements Oct 30, 2024
@x42005e1f
Copy link
Author

x42005e1f commented Oct 30, 2024

Well, I just took the queue.Queue code from Python 3.13, replaced the primitives with my own and measured the performance. In the first tests this queue is ~4x faster than janus.Queue (about as fast as the naive solution), and in the second tests it is ~8x faster (slightly slower than the naive solution). It is possible to speed up even more, but that's a good place to start.

@asvetlov
Copy link
Member

First, please let me describe the status quo.
janus is an open source project managed by volunteers.
The library was developed mostly by me in 2015. It belongs to aio-libs organization which is managed by volunteers also.
I paused my contribution to OSS for about last two years by private reasons. Think of it as a bus factor; that's life.
Other aio-libs developers kept many projects (aiohttp, yarl, name it) running. I very appreciate their support.
janus don't attract many attention by its nature.
Moreover, the latest published version was not broken. Github CI needed some love, that's true. Answering users questions is really great and important, don't get me wrong. But we have what we have.
New contributors are welcome, sure!

Now I've found a time to return back. I have no guarantee to support it forever but I have an intention to do again more or less well. Everyone should decide want one use the library or not.

@asvetlov
Copy link
Member

Regarding janus design. The api is established about 10 years ago. Change existing behavior breaks backward compatibility, we cannot accept it.

If you want to make a pull request with some improvements -- you are welcome!
But please be careful.
In asyncio we spent many days trying to make queue and locks bug-free. Everything worked well in "normal" situation but tasks cancellation produced very non-obvious cases sometimes.
For example, see python/cpython@c47dacb69054 and python/cpython@6b53d5fe04ea
From my experience, Mutex+Condition based implementation is slower but easier and safer by definition.

@x42005e1f
Copy link
Author

OK, I understand. Your project is stable, so I don't want to break backwards compatibility either. But I don't agree that your implementation is easier and safer by definition.

The reason is simple: your code tries to use something similar to async-aware thread-aware conditions, but they are not defined as a separate entity. Which makes it harder to understand the implementation and detect bugs. My variant, which I've measured above, uses the same implementation as the queue module, but with one difference: all conditions can be used via async with and await.

In my opinion, it's much easier when asynchronous code differs from synchronous code only in how waiting is performed. And it's also safer: the implementation of primitives is simplified and has only the logic necessary for their operation (and I've also done a lot of theoretical work to support cancellation and timeouts, so don't worry). And it's even faster, as I've shown. The waits themselves are presented as one-time events that solve only one task: to wake up a thread or task when it's needed. In fact, this is an application of the single-responsibility principle, which is one of the SOLID principles.

Since I think that the queue I described could still be useful, let's decide this: I'll publish my alternative implementation as a separate package, with an interface inspired by Janus. That way we won't have to break the backward compatibility of this package, and we can close some of the issues. But I'll need your help to add a link to the package in your README, because it's very hard to distribute such unpopular solutions nowadays, and without it users are unlikely to know that their wishes are already implemented elsewhere.

@asvetlov
Copy link
Member

asvetlov commented Nov 1, 2024

If you are asking for a link --yes, sure.

@x42005e1f
Copy link
Author

Now available as Culsans.

@asvetlov
Copy link
Member

asvetlov commented Nov 2, 2024

Great! Is there any chance to publish the test suite along with the source code?

@x42005e1f
Copy link
Author

Yes, I'll publish tests and benchmarks as soon as I have enough time. I removed close() in favor of shutdown() from Python 3.13, because of which the janus tests won't work with culsans. Also the internal structure is different, which is another reason. So I'll have to write my own.

@x42005e1f
Copy link
Author

Done. Code coverage is 99%, tests and benchmarks are published along with the source code. Concurrency libraries other than asyncio are not covered by the test suite, but such tests will be added in the future.

@x42005e1f x42005e1f linked a pull request Nov 10, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants