Skip to content

Commit

Permalink
feat: update models
Browse files Browse the repository at this point in the history
  • Loading branch information
dreamhunter2333 committed Jul 11, 2024
1 parent ebbaf0c commit b0922c1
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 72 deletions.
1 change: 0 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
cookie_sub=xxx
max_page=50
max_workers=5
db_url=mysql+mysqlconnector://xxx:xxx/xxx
pika_url=xxx://:xxx@localhost:6379/0
bot_queue=xxx
5 changes: 5 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[flake8]
max-line-length = 120
per-file-ignores =
# imported but unused
__init__.py: F401
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ __pycache__/
venv/
.env.local
.env.*.local
.env
data.db
81 changes: 50 additions & 31 deletions awsl/awsl.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from concurrent.futures import ThreadPoolExecutor
import re
import time
import logging

from .tools import Tools
from typing import Generator

from .tools import Tools
from .models.models import AwslProducer
from .pydantic_models import WeiboList, WeiboListItem
from .config import settings, WB_DATA_URL, WB_SHOW_URL


Expand All @@ -14,10 +16,10 @@

class WbAwsl(object):

def __init__(self, awsl_producer) -> None:
def __init__(self, awsl_producer: AwslProducer) -> None:
self.awsl_producer = awsl_producer
self.uid = awsl_producer.uid
self.max_id = int(awsl_producer.max_id) if awsl_producer.max_id else 0
self.max_id = int(awsl_producer.max_id) if awsl_producer.max_id else Tools.select_max_id(self.uid)
self.url = WB_DATA_URL.format(awsl_producer.uid)
self.keyword = awsl_producer.keyword
_logger.info("awsl init done %s" % awsl_producer)
Expand All @@ -26,49 +28,66 @@ def __init__(self, awsl_producer) -> None:
def start() -> None:
awsl_producers = Tools.find_all_awsl_producer()

with ThreadPoolExecutor(max_workers=settings.max_workers) as executor:
for awsl_producer in awsl_producers:
awsl = WbAwsl(awsl_producer)
executor.submit(awsl.run)
for awsl_producer in awsl_producers:
awsl = WbAwsl(awsl_producer)
awsl.run()

_logger.info("awsl run all awsl_producers done")

def run(self) -> None:
max_id = self.max_id or Tools.select_max_id(self.uid)
_logger.info("awsl run: uid=%s max_id=%s" % (self.uid, max_id))
"""
获取微博数据并处理
"""
_logger.info("awsl run: uid=%s max_id=%s" % (self.uid, self.max_id))
try:
for wbdata in self.get_wbdata(max_id):
if wbdata["id"] > max_id:
Tools.update_max_id(self.uid, wbdata["id"])
max_id = wbdata["id"]
try:
re_mblogid = Tools.update_mblog(self.awsl_producer, wbdata)
re_wbdata = Tools.wb_get(WB_SHOW_URL.format(
re_mblogid)) if re_mblogid else {}
Tools.send2bot(self.awsl_producer, re_mblogid, re_wbdata)
Tools.update_pic(wbdata, re_wbdata)
except Exception as e:
_logger.exception(e)
for wbdata in self.get_wbdata(self.max_id):
self.process_single(wbdata)
time.sleep(10)
except Exception as e:
_logger.exception(e)
_logger.info("awsl run: uid=%s done" % self.uid)

def get_wbdata(self, max_id: int) -> dict:
def process_single(self, wbdata: WeiboListItem) -> None:
"""
处理单条微博
"""
if wbdata.id > self.max_id:
Tools.update_max_id(self.uid, wbdata.id)
self.max_id = wbdata.id
try:
re_mblogid = Tools.update_mblog(self.awsl_producer, wbdata)
re_wbdata = Tools.wb_get(
WB_SHOW_URL.format(re_mblogid)
) if re_mblogid else {}
Tools.send2bot(self.awsl_producer, re_mblogid, re_wbdata)
Tools.update_pic(wbdata, re_wbdata)
except Exception as e:
_logger.exception(e)

def get_wbdata(self, max_id: int) -> Generator[WeiboListItem, None, None]:
"""
获取微博列表数据
"""
for page in range(1, settings.max_page):
wbdatas = Tools.wb_get(url=self.url + str(page))
wbdatas = wbdatas.get("data", {}).get(
"list", []) if wbdatas else []
raw_data = Tools.wb_get(url=self.url + str(page))

try:
wbdatas = WeiboList.model_validate(raw_data)
wbdata_list = wbdatas.data.list if wbdatas and wbdatas.data else []
except Exception as e:
_logger.exception(e)
continue

if not wbdatas:
if not wbdata_list:
return

for wbdata in wbdatas:
if wbdata["id"] <= max_id and page == 1:
for wbdata in wbdata_list:
if wbdata.id <= max_id and page == 1:
continue
elif wbdata["id"] <= max_id:
elif wbdata.id <= max_id:
return
# TODO: 正则是不是更好
text_raw = WB_EMO.sub("", wbdata["text_raw"])
text_raw = WB_EMO.sub("", wbdata.text_raw)
if self.keyword not in text_raw:
continue
yield wbdata
Expand Down
3 changes: 1 addition & 2 deletions awsl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@

class Settings(BaseSettings):
cookie_sub: str = ""
max_page: int = ""
max_workers: int = 5
max_page: int = 50
db_url: str = ""
pika_url: str = ""
bot_queue: str = ""
Expand Down
18 changes: 18 additions & 0 deletions awsl/pydantic_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import List, Optional
from pydantic import BaseModel


class WeiboListItem(BaseModel):
id: int
mblogid: str
text_raw: str
user: Optional[dict] = None
retweeted_status: Optional["WeiboListItem"] = None


class WeiboListData(BaseModel):
list: Optional[List[WeiboListItem]] = None


class WeiboList(BaseModel):
data: Optional[WeiboListData] = None
62 changes: 25 additions & 37 deletions awsl/tools.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import json
import pika
import logging
import requests
import httpx

from typing import List
from sqlalchemy.sql import func
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from .pydantic_models import WeiboListItem

from .models.models import AwslProducer, Mblog, Pic
from .config import CHUNK_SIZE, WB_URL_PREFIX, settings, WB_COOKIE

Expand Down Expand Up @@ -37,96 +39,82 @@ class Tools:
@staticmethod
def wb_get(url) -> dict:
try:
res = requests.get(url=url, headers={
res = httpx.get(url=url, headers={
"cookie": WB_COOKIE.format(settings.cookie_sub)
})
res.raise_for_status()
return res.json()
except Exception as e:
_logger.exception(e)
return None

@staticmethod
def select_max_id(uid: str) -> int:
session = DBSession()
try:
with DBSession() as session:
mblog = session.query(func.max(Mblog.id)).filter(
Mblog.uid == uid).one()
finally:
session.close()
return int(mblog[0]) if mblog and mblog[0] else 0
return int(mblog[0]) if mblog and mblog[0] else 0

@staticmethod
def update_max_id(uid: str, max_id: int) -> None:
session = DBSession()
try:
with DBSession() as session:
session.query(AwslProducer).filter(
AwslProducer.uid == uid
).update({
AwslProducer.max_id: str(max_id)
})
session.commit()
finally:
session.close()

@staticmethod
def update_mblog(awsl_producer: AwslProducer, wbdata: dict) -> str:
def update_mblog(awsl_producer: AwslProducer, wbdata: WeiboListItem) -> str:
if not wbdata:
return ""
origin_wbdata = wbdata.get("retweeted_status") or wbdata
if not origin_wbdata.get("user"):
origin_wbdata = wbdata.retweeted_status or wbdata
if not origin_wbdata.user:
return ""
_logger.info("awsl update db mblog awsl_producer=%s id=%s mblogid=%s" %
(awsl_producer.name, wbdata["id"], wbdata["mblogid"]))
session = DBSession()
try:
(awsl_producer.name, wbdata.id, wbdata.mblogid))
with DBSession() as session:
mblog = Mblog(
id=wbdata["id"],
id=wbdata.id,
uid=awsl_producer.uid,
mblogid=wbdata["mblogid"],
re_id=origin_wbdata["id"],
re_mblogid=origin_wbdata["mblogid"],
re_user_id=origin_wbdata["user"]["id"],
re_user=json.dumps(origin_wbdata["user"])
mblogid=wbdata.mblogid,
re_id=origin_wbdata.id,
re_mblogid=origin_wbdata.mblogid,
re_user_id=origin_wbdata.user["id"],
re_user=json.dumps(origin_wbdata.user)
)
session.add(mblog)
session.commit()
finally:
session.close()

return origin_wbdata["mblogid"]
return origin_wbdata.mblogid

@staticmethod
def update_pic(wbdata: dict, re_wbdata: dict) -> None:
def update_pic(wbdata: WeiboListItem, re_wbdata: dict) -> None:
if not re_wbdata:
return
pic_infos = re_wbdata.get("pic_infos", {})
session = DBSession()
try:
with DBSession() as session:
for sequence, pic_id in enumerate(re_wbdata.get("pic_ids", [])):
session.add(Pic(
awsl_id=wbdata["id"],
awsl_id=wbdata.id,
sequence=sequence,
pic_id=pic_id,
pic_info=json.dumps(pic_infos[pic_id]),
))
session.commit()
finally:
session.close()

@staticmethod
def find_all_awsl_producer() -> List[AwslProducer]:
session = DBSession()
try:
with DBSession() as session:
awsl_producers = session.query(
AwslProducer
).filter(
AwslProducer.in_verification.isnot(True)
).filter(
AwslProducer.deleted.isnot(True)
).all()
finally:
session.close()
return awsl_producers
return awsl_producers

@staticmethod
def send2bot(awsl_producer: AwslProducer, re_mblogid: str, re_wbdata: dict) -> None:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ mysql-connector-python==9.0.0
pika==1.3.2
pydantic==2.8.2
pydantic-settings==2.3.4
requests==2.32.3
httpx==0.27.0
SQLAlchemy==2.0.31

0 comments on commit b0922c1

Please sign in to comment.