commit d2c8d9a1528603bf83a8a6648e631a9cef6b5219 Author: sunwoo1524 Date: Sat Nov 2 20:21:55 2024 +0900 Feat: generate feed diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4ea05a1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +venv/ +__pycache__/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..a39f301 --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +# Mastofeed +Mastodon to RSS 2.0 feed bridge \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..2712825 --- /dev/null +++ b/main.py @@ -0,0 +1,18 @@ +from fastapi import FastAPI, HTTPException, Response +from utils.feed_generator import generate_feed_of_user, USER_NOT_FOUND, CANNOT_ACCESS_INSTANCE, INVALID_HANDLE + +app = FastAPI() + + +@app.get("/feed/{user_handle}") +def get_feed_of_user(user_handle: str): + feed = generate_feed_of_user(user_handle) + + if feed == USER_NOT_FOUND: + return HTTPException(status_code=404, detail="The user cannot be found.") + if feed == CANNOT_ACCESS_INSTANCE: + return HTTPException(status_code=400, detail="Cannot access the instance.") + if feed == INVALID_HANDLE: + return HTTPException(status_code=400, detail="The handle is invalid.") + + return Response(content=feed, media_type="application/xml") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..bece47b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +fastapi[standard]==0.115.4 +requests==2.32.3 +rfeed==1.1.1 \ No newline at end of file diff --git a/utils/feed_generator.py b/utils/feed_generator.py new file mode 100644 index 0000000..52ff968 --- /dev/null +++ b/utils/feed_generator.py @@ -0,0 +1,94 @@ +import datetime, rfeed, requests, re + +# error codes +USER_NOT_FOUND = 1 +CANNOT_ACCESS_INSTANCE = 2 +INVALID_HANDLE = 3 + + +def parse_handle(user_handle: str) -> list[str] | None: + # validate user's handle + HANDLE_PATTERN = "@[a-zA-Z0-9_]+@[^\t\n\r\f\v]+" + if re.match(HANDLE_PATTERN, user_handle) is None: + return None + + # parse user's handle + return user_handle.split("@")[1:] + + +def get_statuses_of_user(user_handle: str) -> list[dict] | str: + parsed_handle = parse_handle(user_handle) + if parsed_handle is None: + return INVALID_HANDLE + [username, instance] = parsed_handle + + try: + account_lookup = requests.get(f"https://{instance}/api/v1/accounts/lookup?acct={username}") + except requests.exceptions.RequestException as err: + print(err) + return CANNOT_ACCESS_INSTANCE + + if not account_lookup.ok: + return USER_NOT_FOUND + + account_info = account_lookup.json() + user_id = account_info["id"] + + try: + statuses_get = requests.get(f"https://{instance}/api/v1/accounts/{user_id}/statuses?exclude_reblogs=true&exclude_replies=true") + except requests.exceptions.RequestException as err: + print(err) + return CANNOT_ACCESS_INSTANCE + + if not statuses_get.ok: + return USER_NOT_FOUND + + return statuses_get.json() + + +def generate_feed_of_user(user_handle: str) -> str | int: + # get user's statuses + statuses = get_statuses_of_user(user_handle) + if type(statuses) is int: + return statuses + + # generate rss feed + status_items = [] + for status in statuses: + content = status["content"] + + # get media of status + for media in status["media_attachments"]: + if media["type"] == "image": + content += f"" + elif media["type"] == "video" or media["type"] == "gifv": + content += f"" + elif media["type"] == "audio": + content += f"{f"{media["description"]}" if not media["description"] is None else ""}" + else: + content += f"{media["description"] if not media["description"] is None else media["url"]}" + + status_items.append(rfeed.Item( + description=content, + link=status["url"], + guid=rfeed.Guid(status["url"]), + pubDate=datetime.datetime.strptime(status["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ") + )) + + feed = rfeed.Feed( + title=status["account"]["display_name"], + link=status["account"]["url"], + description=status["account"]["note"], + lastBuildDate=datetime.datetime.now(), + items=status_items + ) + + return feed.rss() + + +# test +if __name__ == "__main__": + print(generate_feed_of_user("@sunwoo1524@pointless.chat")) + print(generate_feed_of_user("@Wwwsdfsefwr3wsfsfSsdf@pointless.chat")) + print(generate_feed_of_user("@sunwoo1524@owefijwoejfsjfsjfsdflks.social")) + print(generate_feed_of_user("@sunwoo1524@"))