-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.py
136 lines (105 loc) · 3.85 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import sqlite3
from dataclasses import make_dataclass
from contextlib import contextmanager
from pathlib import Path
from threading import Lock
import logging
from typing import Any, Dict, List, Tuple
from .config import config as cfg
logger = logging.getLogger(__name__)
logger.setLevel(cfg.LOG_LEVEL)
class _CacheMetaMixin:
_cols = cfg.COLUMNS
def to_tuple(self) -> Tuple[Any]:
return tuple([getattr(self, k) for k in self._cols])
@classmethod
def row_to_meta(cls, row: Dict[str, Any]):
res = cls()
for k in cls._cols:
setattr(res, k, row[k])
return res
def make_cachemeta_cls(name: str):
# set default value of each field as field type's zero value
return make_dataclass(
name,
[(k, v.col_type, v.col_type()) for k, v in cfg.COLUMNS.items()],
bases=(_CacheMetaMixin,),
)
CacheMeta = make_cachemeta_cls("CacheMeta")
class OTACacheDB:
TABLE_NAME = cfg.TABLE_NAME
INIT_DB: str = (
f"CREATE TABLE {cfg.TABLE_NAME}("
+ ", ".join([f"{k} {v.col_def}" for k, v in cfg.COLUMNS.items()])
+ ")"
)
def __init__(self, db_file: str, init: bool = False):
logger.debug("init database...")
self._db_file = db_file
self._wlock = Lock()
self._closed = False
self._connect_db(init)
@contextmanager
def _general_query(self, query: str, query_param: List[Any], init: bool = False):
if not init and self._closed:
raise sqlite3.OperationalError("connect is closed")
_query_method = query.strip().split(" ")[0]
cur = self._con.cursor()
_query_handler = cur.execute
if _query_method in {"INSERT", "DELETE"}:
_query_handler = cur.executemany
_query_handler(query, query_param)
try:
yield cur
finally:
cur.close()
def close(self):
logger.debug("closing db...")
if not self._closed:
self._con.close()
self._closed = True
def _init_table(self):
logger.debug("init sqlite database...")
with self._general_query(self.INIT_DB, (), init=True):
self._con.commit()
def _connect_db(self, init: bool):
if init:
Path(self._db_file).unlink(missing_ok=True)
self._con = sqlite3.connect(self._db_file, check_same_thread=False)
self._con.row_factory = sqlite3.Row
# check if the table exists
with self._general_query(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(self.TABLE_NAME,),
init=True,
) as cur:
if cur.fetchone() is None:
self._init_table()
def remove_url_by_hash(self, *hash: str):
hash = [(h,) for h in hash]
with self._wlock, self._general_query(
f"DELETE FROM {self.TABLE_NAME} WHERE hash=?", hash
):
self._con.commit()
def remove_urls(self, *urls: str):
urls = [(u,) for u in urls]
with self._wlock, self._general_query(
f"DELETE FROM {self.TABLE_NAME} WHERE url=?", urls
):
self._con.commit()
def insert_urls(self, *cache_meta: CacheMeta):
rows = [m.to_tuple() for m in cache_meta]
with self._wlock, self._general_query(
f"INSERT OR REPLACE INTO {self.TABLE_NAME} VALUES (?,?,?,?,?)",
rows,
):
self._con.commit()
def lookup_url(self, url: str) -> CacheMeta:
with self._general_query(
f"SELECT * FROM {self.TABLE_NAME} WHERE url=?", (url,)
) as cur:
return CacheMeta.row_to_meta(cur.fetchone())
def lookup_all(self) -> CacheMeta:
with self._general_query(f"SELECT * FROM {self.TABLE_NAME}", ()) as cur:
for row in cur.fetchall():
yield CacheMeta.row_to_meta(row)