Skip to content
This repository has been archived by the owner on May 4, 2021. It is now read-only.

Commit

Permalink
Merge pull request #68 from openstax/advisory_lock
Browse files Browse the repository at this point in the history
Use an advisory lock for exercise calculations
  • Loading branch information
Dantemss authored Sep 30, 2019
2 parents b13380f + 60bf0c8 commit 493a61b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 15 deletions.
43 changes: 29 additions & 14 deletions sparfa_server/tasks/calcs.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from collections import defaultdict
from uuid import UUID
from textwrap import dedent
from random import shuffle

from sqlalchemy import text
from sqlalchemy.sql.expression import func

from ..biglearn import BLSCHED
from ..orm import transaction, Ecosystem, Page, Response, EcosystemMatrix
Expand Down Expand Up @@ -103,19 +105,32 @@ def calculate_exercises():
"""Calculate all personalized exercises"""
calculations = BLSCHED.fetch_exercise_calculations()
while calculations:
calculations_by_ecosystem_uuid = defaultdict(list)
calculation_by_uuid = {}
for calculation in calculations:
calculations_by_ecosystem_uuid[calculation['ecosystem_uuid']].append(calculation)
calculation_by_uuid[calculation['calculation_uuid']] = calculation

with transaction() as session:
# Skip calculations that refer to unknown ecosystems
# and ecosystems that we can't lock immediately
# Locking ecosystem matrices is not ideal as only one worker can update exercises
# per ecosystem at a time and we block concurrent updates to the ecosystem matrix,
# but it is our best option at the moment
# Attempt to advisory lock calculations received
# https://stackoverflow.com/a/3530326
query = [func.pg_try_advisory_xact_lock(
(UUID(uuid).int >> 64) - 2**63
).label(uuid) for uuid in calculation_by_uuid]
result = session.execute(session.query(*query).statement).first()
locked_calculations = [
calculation_by_uuid[uuid] for uuid, locked in result.items() if locked
]

# Process only calculations that we successfully locked
if not locked_calculations:
break

calculations_by_ecosystem_uuid = defaultdict(list)
for calculation in locked_calculations:
calculations_by_ecosystem_uuid[calculation['ecosystem_uuid']].append(calculation)

ecosystem_matrices = session.query(EcosystemMatrix).filter(
EcosystemMatrix.uuid.in_(calculations_by_ecosystem_uuid.keys())
).with_for_update(key_share=True, skip_locked=True).all()
).all()

if not ecosystem_matrices:
break
Expand All @@ -128,8 +143,8 @@ def calculate_exercises():
ecosystem_uuid = ecosystem_matrix.uuid
Q_ids_set = set(ecosystem_matrix.Q_ids)

calculations = calculations_by_ecosystem_uuid[ecosystem_uuid]
for calculation in calculations:
ecosystem_calculations = calculations_by_ecosystem_uuid[ecosystem_uuid]
for calculation in ecosystem_calculations:
calc_uuid = calculation['calculation_uuid']

# Partition exercise_uuids into known and unknown
Expand Down Expand Up @@ -166,18 +181,18 @@ def calculate_exercises():
exercise_calculation_requests = []
for ecosystem_matrix in ecosystem_matrices:
ecosystem_uuid = ecosystem_matrix.uuid
calculations = calculations_by_ecosystem_uuid[ecosystem_uuid]
ecosystem_calculations = calculations_by_ecosystem_uuid[ecosystem_uuid]

algs = ecosystem_matrix.to_sparfa_algs_with_student_uuids_responses(
student_uuids=[calc['student_uuid'] for calc in calculations],
student_uuids=[calc['student_uuid'] for calc in ecosystem_calculations],
responses=[
resp
for calc in calculations
for calc in ecosystem_calculations
for resp in response_dicts_by_calculation_uuid[calc['calculation_uuid']]
]
)

for calculation in calculations:
for calculation in ecosystem_calculations:
calculation_uuid = calculation['calculation_uuid']

ordered_Q_infos = algs.tesr(
Expand Down
14 changes: 13 additions & 1 deletion tests/unit/tasks/test_calcs.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from uuid import uuid4
from uuid import uuid4, UUID
from random import choice, shuffle
from datetime import datetime
from unittest.mock import patch
from contextlib import closing

from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql.expression import func

from sparfa_server.orm.sessions import ENGINE
from sparfa_server.orm import Ecosystem, Page, Response, EcosystemMatrix
from sparfa_server.tasks.calcs import (calculate_ecosystem_matrices,
calculate_exercises,
Expand Down Expand Up @@ -163,6 +168,13 @@ def test_calculate_exercises(transaction):
with patch(
'sparfa_server.tasks.calcs.BLSCHED.update_exercise_calculations', autospec=True
) as update_exercise_calculations:
with closing(sessionmaker(bind=ENGINE)()) as session:
query = [func.pg_try_advisory_xact_lock(
(UUID(calculation['calculation_uuid']).int >> 64) - 2**63
) for calculation in exercise_calculations]
session.query(*query).one()
calculate_exercises()

calculate_exercises()

update_exercise_calculations.assert_not_called()
Expand Down

0 comments on commit 493a61b

Please sign in to comment.