Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple plone sites on one installation via PLONE_BACKEND_HOST #110

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

- Add support for optional es host in worker via PLONE_ELASTICSEARCH_HOST env variable @maethu

- Add support for multiple plone sites on one installation via PLONE_BACKEND_HOST @maethu


## 5.0.0 (2022-10-11)

Expand Down
4 changes: 3 additions & 1 deletion src/collective/elasticsearch/indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def extract(self, name, data):
return data[name]["path"]

def get_query(self, name, value):
if isinstance(value, str):
if isinstance(value, str) or isinstance(value, list):
paths = value
depth = -1
navtree = False
Expand All @@ -284,6 +284,8 @@ def get_query(self, name, value):
paths = [paths]
andfilters = []
for path in paths:
if isinstance(path, tuple) or isinstance(path, list):
path, depth = path
spath = path.split("/")
gtcompare = "gt"
start = len(spath) - 1
Expand Down
26 changes: 24 additions & 2 deletions src/collective/elasticsearch/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from zope.interface import implementer
from ZTUtils.Lazy import LazyMap

import os
import warnings


Expand Down Expand Up @@ -310,7 +311,14 @@ def _bulk_call_redis(self, batch):

logger.info(f"Bulk call with {len(batch)} entries and {len(batch)} actions.")
hosts, params = utils.get_connection_settings()
bulk_update.delay(hosts, params, index_name=self.index_name, body=batch)

bulk_update.delay(
hosts,
params,
index_name=self.index_name,
body=batch,
plone_url=self.get_plone_url(),
)
logger.info("redis task created")

def update_blob(self, item):
Expand All @@ -319,7 +327,13 @@ def update_blob(self, item):
hosts, params = utils.get_connection_settings()

if item[1]:
update_file_data.delay(hosts, params, index_name=self.index_name, body=item)
update_file_data.delay(
hosts,
params,
index_name=self.index_name,
body=item,
plone_url=self.get_plone_url(),
)
logger.info("redis task to index blob data created")

def flush_indices(self):
Expand Down Expand Up @@ -349,6 +363,14 @@ def get_record_by_path(self, path: str) -> dict:
record = hits[0]["_source"] if hits else {}
return record

def get_plone_url(self):
"""This enables multiple plone sites in one ZODB/storage"""
plone_url = None
backend_host = os.environ.get("PLONE_BACKEND_HOST", None)
if backend_host:
plone_url = backend_host + "/".join(api.portal.get().getPhysicalPath())
return plone_url

def _search(self, query, sort=None, **query_params):
""" """
if "start" in query_params:
Expand Down
2 changes: 1 addition & 1 deletion src/collective/elasticsearch/queueprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def get_data_for_es(self, uuid, attributes=None):
value = indexer()
else:
attr = getattr(obj, index_name, None)
value = attr() if callable(attr) else value
value = attr() if callable(attr) else attr
# Use str, if bytes value
value = (
value.decode("utf-8", "ignore") if isinstance(value, bytes) else value
Expand Down
16 changes: 10 additions & 6 deletions src/collective/elasticsearch/redis/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
)


def fetch_data(uuid, attributes):
backend = os.environ.get("PLONE_BACKEND", None)
url = backend + "/@elasticsearch_extractdata"
def fetch_data(plone_url, uuid, attributes):
if not plone_url:
plone_url = os.environ.get("PLONE_BACKEND", None)
url = plone_url + "/@elasticsearch_extractdata"
payload = {"uuid": uuid, "attributes:list": attributes}
response = session.get(url, params=payload, verify=False, timeout=60)
if response.status_code == 200:
Expand All @@ -30,8 +31,11 @@ def fetch_data(uuid, attributes):
raise Exception("Bad response from Plone Backend")


def fetch_blob_data(fieldname, data):
backend = os.environ.get("PLONE_BACKEND", None)
download_url = "/".join([backend, data[fieldname]["path"], "@@download", fieldname])
def fetch_blob_data(plone_url, fieldname, data):
if not plone_url:
plone_url = os.environ.get("PLONE_BACKEND", None)
download_url = "/".join(
[plone_url, data[fieldname]["path"], "@@download", fieldname]
)
file_ = session_data.get(download_url)
return io.BytesIO(file_.content)
18 changes: 11 additions & 7 deletions src/collective/elasticsearch/redis/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def es_connection(hosts, **params):
) # Don't queue in tests


@job(queue, retry=Retry(max=3, interval=30))
def bulk_update(hosts, params, index_name, body):
@job(queue, connection=queue.connection, retry=Retry(max=3, interval=30))
def bulk_update(hosts, params, index_name, body, plone_url):
"""
Collects all the data and updates elasticsearch
"""
Expand All @@ -64,11 +64,15 @@ def bulk_update(hosts, params, index_name, body):
catalog_info, payload = item
action, index_info = list(catalog_info.items())[0]
if action == "index":
data = fetch_data(uuid=index_info["_id"], attributes=list(payload.keys()))
data = fetch_data(
plone_url, uuid=index_info["_id"], attributes=list(payload.keys())
)
item[1] = data
elif action == "update":
data = fetch_data(
uuid=index_info["_id"], attributes=list(payload["doc"].keys())
plone_url,
uuid=index_info["_id"],
attributes=list(payload["doc"].keys()),
)
item[1]["doc"] = data

Expand All @@ -77,8 +81,8 @@ def bulk_update(hosts, params, index_name, body):
return "Done"


@job(queue_low)
def update_file_data(hosts, params, index_name, body):
@job(queue_low, connection=queue_low.connection)
def update_file_data(hosts, params, index_name, body, plone_url):
"""
Get blob data from plone and index it via elasticsearch attachment pipeline
"""
Expand All @@ -89,7 +93,7 @@ def update_file_data(hosts, params, index_name, body):
attachments = {"attachments": []}

for fieldname, content in data.items():
file_ = fetch_blob_data(fieldname, data)
file_ = fetch_blob_data(plone_url, fieldname, data)
attachments["attachments"].append(
{
"filename": content["filename"],
Expand Down
50 changes: 49 additions & 1 deletion src/collective/elasticsearch/tests/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

ENV_FOR_REDIS = {
"PLONE_REDIS_DSN": "redis://localhost:6379/0",
"PLONE_BACKEND": "http://localhost",
"PLONE_BACKEND": "http://localhost/plone",
"PLONE_USERNAME": "admin",
"PLONE_PASSWORD": "password",
}
Expand All @@ -40,6 +40,15 @@ def test_redis_not_available_if_environ_vars_are_missing(self):
"All env vars ar available, this should be true",
)

env_for_redis_with_host = ENV_FOR_REDIS.copy()
del env_for_redis_with_host["PLONE_BACKEND"]
env_for_redis_with_host["PLONE_BACKEND_HOST"] = "http://localhost"
with mock.patch.dict(os.environ, env_for_redis_with_host):
self.assertTrue(
True,
"All env vars ar available, this should be true",
)


class TestUseRedis(BaseRedisTest):
def test_use_redis_if_configured(self):
Expand All @@ -50,6 +59,45 @@ def test_use_redis_if_configured(self):
self.assertTrue(utils.use_redis(), "Using redis should be enabled")


class TestPloneBackendHost(BaseRedisTest):
def setUp(self):
super().setUp()
self.original_plone_backend = os.environ["PLONE_BACKEND"]
del os.environ["PLONE_BACKEND"]
os.environ[
"PLONE_BACKEND_HOST"
] = f'http://{self.layer["host"]}:{self.layer["port"]}'

def tearDown(self):
super().tearDown()
del os.environ["PLONE_BACKEND_HOST"]
os.environ["PLONE_BACKEND"] = self.original_plone_backend

def test_index_data_from_file_and_search(self):
file_path = os.path.join(os.path.dirname(__file__), "assets/test.pdf")
with io.FileIO(file_path, "rb") as pdf:
api.content.create(
container=api.portal.get(),
type="File",
id="test-file",
title="demo",
file=NamedBlobFile(data=pdf.read(), filename="test.pdf"),
)
self.commit(wait=1)

query = {"SearchableText": "text"}
cat_results = self.catalog._old_searchResults(**query)
self.assertEqual(0, len(cat_results), "Expect no result")
es_results = self.catalog(**query)
self.assertEqual(1, len(es_results), "Expect 1 item")

query = {"SearchableText": "demo"}
cat_results = self.catalog._old_searchResults(**query)
self.assertEqual(1, len(cat_results), "Expect 1 item")
es_results = self.catalog(**query)
self.assertEqual(1, len(es_results), "Expect 1 item")


class TestExtractRestApiEndpoint(BaseRedisTest):
def setUp(self):
super().setUp()
Expand Down
7 changes: 5 additions & 2 deletions src/collective/elasticsearch/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,17 @@ def format_size_mb(value: int) -> str:

def is_redis_available():
"""Determens if redis could be available"""
env_variables = [
env_variables_required = [
HAS_REDIS_MODULE,
os.environ.get("PLONE_REDIS_DSN", None),
os.environ.get("PLONE_USERNAME", None),
os.environ.get("PLONE_PASSWORD", None),
]
env_any_required = [
os.environ.get("PLONE_BACKEND", None),
os.environ.get("PLONE_BACKEND_HOST", None),
]
return all(env_variables)
return all(env_variables_required) and any(env_any_required)


def use_redis():
Expand Down
Loading