Feat: generate feed
This commit is contained in:
commit
d2c8d9a152
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
venv/
|
||||||
|
__pycache__/
|
18
main.py
Normal file
18
main.py
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
from fastapi import FastAPI, HTTPException, Response
|
||||||
|
from utils.feed_generator import generate_feed_of_user, USER_NOT_FOUND, CANNOT_ACCESS_INSTANCE, INVALID_HANDLE
|
||||||
|
|
||||||
|
app = FastAPI()
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/feed/{user_handle}")
|
||||||
|
def get_feed_of_user(user_handle: str):
|
||||||
|
feed = generate_feed_of_user(user_handle)
|
||||||
|
|
||||||
|
if feed == USER_NOT_FOUND:
|
||||||
|
return HTTPException(status_code=404, detail="The user cannot be found.")
|
||||||
|
if feed == CANNOT_ACCESS_INSTANCE:
|
||||||
|
return HTTPException(status_code=400, detail="Cannot access the instance.")
|
||||||
|
if feed == INVALID_HANDLE:
|
||||||
|
return HTTPException(status_code=400, detail="The handle is invalid.")
|
||||||
|
|
||||||
|
return Response(content=feed, media_type="application/xml")
|
3
requirements.txt
Normal file
3
requirements.txt
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
fastapi[standard]==0.115.4
|
||||||
|
requests==2.32.3
|
||||||
|
rfeed==1.1.1
|
94
utils/feed_generator.py
Normal file
94
utils/feed_generator.py
Normal file
|
@ -0,0 +1,94 @@
|
||||||
|
import datetime, rfeed, requests, re
|
||||||
|
|
||||||
|
# error codes
|
||||||
|
USER_NOT_FOUND = 1
|
||||||
|
CANNOT_ACCESS_INSTANCE = 2
|
||||||
|
INVALID_HANDLE = 3
|
||||||
|
|
||||||
|
|
||||||
|
def parse_handle(user_handle: str) -> list[str] | None:
|
||||||
|
# validate user's handle
|
||||||
|
HANDLE_PATTERN = "@[a-zA-Z0-9_]+@[^\t\n\r\f\v]+"
|
||||||
|
if re.match(HANDLE_PATTERN, user_handle) is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# parse user's handle
|
||||||
|
return user_handle.split("@")[1:]
|
||||||
|
|
||||||
|
|
||||||
|
def get_statuses_of_user(user_handle: str) -> list[dict] | str:
|
||||||
|
parsed_handle = parse_handle(user_handle)
|
||||||
|
if parsed_handle is None:
|
||||||
|
return INVALID_HANDLE
|
||||||
|
[username, instance] = parsed_handle
|
||||||
|
|
||||||
|
try:
|
||||||
|
account_lookup = requests.get(f"https://{instance}/api/v1/accounts/lookup?acct={username}")
|
||||||
|
except requests.exceptions.RequestException as err:
|
||||||
|
print(err)
|
||||||
|
return CANNOT_ACCESS_INSTANCE
|
||||||
|
|
||||||
|
if not account_lookup.ok:
|
||||||
|
return USER_NOT_FOUND
|
||||||
|
|
||||||
|
account_info = account_lookup.json()
|
||||||
|
user_id = account_info["id"]
|
||||||
|
|
||||||
|
try:
|
||||||
|
statuses_get = requests.get(f"https://{instance}/api/v1/accounts/{user_id}/statuses?exclude_reblogs=true&exclude_replies=true")
|
||||||
|
except requests.exceptions.RequestException as err:
|
||||||
|
print(err)
|
||||||
|
return CANNOT_ACCESS_INSTANCE
|
||||||
|
|
||||||
|
if not statuses_get.ok:
|
||||||
|
return USER_NOT_FOUND
|
||||||
|
|
||||||
|
return statuses_get.json()
|
||||||
|
|
||||||
|
|
||||||
|
def generate_feed_of_user(user_handle: str) -> str | int:
|
||||||
|
# get user's statuses
|
||||||
|
statuses = get_statuses_of_user(user_handle)
|
||||||
|
if type(statuses) is int:
|
||||||
|
return statuses
|
||||||
|
|
||||||
|
# generate rss feed
|
||||||
|
status_items = []
|
||||||
|
for status in statuses:
|
||||||
|
content = status["content"]
|
||||||
|
|
||||||
|
# get media of status
|
||||||
|
for media in status["media_attachments"]:
|
||||||
|
if media["type"] == "image":
|
||||||
|
content += f"<img src='{media["url"]}' {f"alt='{media["description"]}'" if not media["description"] is None else ""} />"
|
||||||
|
elif media["type"] == "video" or media["type"] == "gifv":
|
||||||
|
content += f"<video><source src='{media["url"]}'>{f"<p>{media["description"]}</p>" if not media["description"] is None else ""}</source></video>"
|
||||||
|
elif media["type"] == "audio":
|
||||||
|
content += f"<audio src='{media["url"]}' controls></audio>{f"<span>{media["description"]}</span>" if not media["description"] is None else ""}"
|
||||||
|
else:
|
||||||
|
content += f"<a href='{media["url"]}'>{media["description"] if not media["description"] is None else media["url"]}</a>"
|
||||||
|
|
||||||
|
status_items.append(rfeed.Item(
|
||||||
|
description=content,
|
||||||
|
link=status["url"],
|
||||||
|
guid=rfeed.Guid(status["url"]),
|
||||||
|
pubDate=datetime.datetime.strptime(status["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
|
||||||
|
))
|
||||||
|
|
||||||
|
feed = rfeed.Feed(
|
||||||
|
title=status["account"]["display_name"],
|
||||||
|
link=status["account"]["url"],
|
||||||
|
description=status["account"]["note"],
|
||||||
|
lastBuildDate=datetime.datetime.now(),
|
||||||
|
items=status_items
|
||||||
|
)
|
||||||
|
|
||||||
|
return feed.rss()
|
||||||
|
|
||||||
|
|
||||||
|
# test
|
||||||
|
if __name__ == "__main__":
|
||||||
|
print(generate_feed_of_user("@sunwoo1524@pointless.chat"))
|
||||||
|
print(generate_feed_of_user("@Wwwsdfsefwr3wsfsfSsdf@pointless.chat"))
|
||||||
|
print(generate_feed_of_user("@sunwoo1524@owefijwoejfsjfsjfsdflks.social"))
|
||||||
|
print(generate_feed_of_user("@sunwoo1524@"))
|
Loading…
Reference in a new issue