chen/main.py

334 行
10 KiB
Python

import requests
import bs4
import youtube_dl
import random
import configparser
import re
import io
import os
import mimetypes
import asyncio
from collections import defaultdict
from PythonSed import Sed
from slixmpp import ClientXMPP
from urllib.parse import urlparse, parse_qs, urlunparse
from pantomime import normalize_mimetype
import cgi
sed_parse = re.compile("(?<!\\\\)[/#]")
sed_cmd = re.compile("^s[/#].*[/#].*[/#]")
parser = "html.parser"
user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:10.0)"
" Gecko/20100101 Firefox/10.0"
accept_lang = "en-US"
data_limit = 100000000 # 100MB
headers = {
"user-agent": user_agent,
"Accept-Language": accept_lang,
"Cache-Control": "no-cache",
}
youtube_links = ["www.youtube.com", "m.youtube.com"]
youtube_link = "youtu.be"
ydl = youtube_dl.YoutubeDL()
invidious_instances = ["invidious.snopyta.org"]
block_list = ("localhost", "127.0.0.1", "0.0.0.0")
req_list = ("http://", "https://")
html_files = ("text/html", "application/xhtml+xml")
class Lifo(list):
"""
Limited size LIFO array to store messages and urls
"""
def __init__(self, size):
super().__init__()
self.size = size
def add(self, item):
self.insert(0, item)
if len(self) > self.size:
self.pop()
def get_youtube_title(url):
try:
info = ydl.extract_info(url, download=False)
return info["title"]
except Exception:
return ""
def get_invidious_link(yurl):
video = yurl.split("/")[-1]
instance = random.choice(invidious_instances)
return f"https://{instance}/watch?v={video}"
def get_yurl(path):
yurl = f"https://youtu.be/{path}"
return yurl
class AngelBot(ClientXMPP):
messages = defaultdict(
lambda: {
"messages": Lifo(100),
"links": Lifo(10),
"previews": Lifo(10),
}
)
def get_urls(self, msg):
str_list = msg["body"].strip().split()
urls = [u for u in str_list if any(r in u for r in req_list)]
return urls
def send_youtube_info(self, uri, sender, mtype):
if uri.netloc == youtube_link:
yurl = get_yurl(uri.path)
elif "v" in (query := parse_qs(uri.query)):
if v := query["v"]:
yurl = get_yurl(v[0])
else:
return
if output := get_youtube_title(yurl):
if output in self.messages[sender]["previews"]:
return
self.messages[sender]["previews"].add(output)
invidious = get_invidious_link(yurl)
self.send_message(mto=sender, mbody=f"*{output}*", mtype=mtype)
self.send_message(mto=sender, mbody=invidious, mtype=mtype)
async def parse_uri(self, uri, sender, mtype):
netloc = uri.netloc
if netloc in (youtube_links + [youtube_link]):
self.send_youtube_info(uri, sender, mtype)
elif netloc.split(":")[0] in block_list:
return
else:
await self.process_link(uri, sender, mtype)
async def process_link(self, uri, sender, mtype):
url = urlunparse(uri)
r = requests.get(url, stream=True, headers=headers, timeout=5)
if not r.ok:
return
ftype = normalize_mimetype(r.headers.get("content-type"))
if ftype in html_files:
data = ""
for i in r.iter_content(chunk_size=1024, decode_unicode=False):
data += i.decode("utf-8", errors="ignore")
if len(data) > data_limit or "</head>" in data.lower():
break
soup = bs4.BeautifulSoup(data, parser)
if title := soup.find("title"):
output = title.text.strip()
if output:
output = f"*{output}*" if ("\n" not in output) else output
if output in self.messages[sender]["previews"]:
return
self.messages[sender]["previews"].add(output)
if r.history:
self.send_message(mto=sender, mbody=r.url, mtype=mtype)
self.send_message(mto=sender, mbody=output, mtype=mtype)
else:
try:
lenght = 0
outfile = io.BytesIO()
for chunk in r.iter_content(
chunk_size=512,
decode_unicode=False,
):
lenght += 512
if lenght >= data_limit:
return
outfile.write(chunk)
content_disposition = r.headers.get("content-disposition")
_, params = cgi.parse_header(content_disposition)
filename = params.get("filename")
ext = os.path.splitext(filename)[1] if filename else None or ".bin"
fname = filename or uri.path.strip("/").split("/")[-1] or f"file{ext}"
await self.embed_file(url, sender, mtype, ftype, fname, outfile)
except Exception:
...
async def embed_file(self, url, sender, mtype, ftype, fname, outfile):
furl = await self.plugin["xep_0363"].upload_file(
fname, content_type=ftype, input_file=outfile
)
message = self.make_message(sender)
message["body"] = furl
message["type"] = mtype
message["oob"]["url"] = furl
message.send()
async def parse_urls(self, msg, urls, sender, mtype):
if "nsfw" in msg["body"].lower():
return
for u in urls:
if u in self.messages[sender]["links"]:
continue
else:
self.messages[sender]["links"].add(u)
uri = urlparse(u)
await self.parse_uri(uri, sender, mtype)
def sed_command(self, msg, sender, mtype):
try:
text = msg["body"]
if not sed_cmd.match(text):
self.messages[sender]["messages"].add(text)
return
sed_args = sed_parse.split(text)
if len(sed_args) < 4:
return
sed = Sed()
sed.load_string(text)
for message in self.messages[sender]["messages"]:
if sed_args[1] not in message:
continue
msg = io.StringIO(message)
res = "\n".join(sed.apply(msg, None))
self.messages[sender]["messages"].add(res)
return self.send_message(
mto=sender,
mbody=res,
mtype=mtype,
)
except Exception:
return
def __init__(self, jid, password, nick="angel", autojoin=None):
ClientXMPP.__init__(self, jid, password)
self.jid = jid
self.nick = nick
self.autojoin = autojoin or []
self.register_plugin("xep_0030")
self.register_plugin("xep_0060")
self.register_plugin("xep_0054")
self.register_plugin("xep_0045")
self.register_plugin("xep_0066")
self.register_plugin("xep_0084")
self.register_plugin("xep_0153")
self.register_plugin("xep_0363")
self.add_event_handler("session_start", self.session_start)
self.add_event_handler("message", self.message)
self.add_event_handler("groupchat_message", self.muc_message)
# self.add_event_handler("vcard_avatar_update", self.debug_event)
# self.add_event_handler("stream_error", self.debug_event)
self.add_event_handler("disconnected", lambda _: self.connect())
async def session_start(self, event):
self.send_presence()
await self.get_roster()
await self.update_info()
for channel in self.autojoin:
try:
self.plugin["xep_0045"].join_muc(channel, self.nick)
except:
...
async def update_info(self):
with open("angel.png", "rb") as avatar_file:
avatar = avatar_file.read()
avatar_type = "image/png"
avatar_id = self.plugin["xep_0084"].generate_id(avatar)
avatar_bytes = len(avatar)
info = {
"id": avatar_id,
"type": avatar_type,
"bytes": avatar_bytes,
}
vcard = self.plugin["xep_0054"].make_vcard()
vcard["URL"] = "https://gt.kalli.st/czar/angel"
asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard))
asyncio.gather(self.plugin["xep_0084"].publish_avatar(avatar))
asyncio.gather(
self.plugin["xep_0153"].set_avatar(
avatar=avatar,
mtype=avatar_type,
)
)
asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info]))
async def message(self, msg):
if msg["type"] in ("chat", "normal"):
mtype = "chat"
sender = msg["from"].bare
edit = "urn:xmpp:message-correct:0" in str(msg)
if edit:
return
try:
if not msg["oob"]["url"]:
if urls := self.get_urls(msg):
await self.parse_urls(msg, urls, sender, mtype)
except Exception:
...
self.sed_command(msg, sender, mtype)
async def muc_message(self, msg):
if msg["type"] in ("groupchat", "normal"):
mtype = "groupchat"
sender = msg["from"].bare
if msg["mucnick"] == self.nick:
return
edit = "urn:xmpp:message-correct:0" in str(msg)
if edit:
return
try:
if not msg["oob"]["url"]:
if urls := self.get_urls(msg):
await self.parse_urls(msg, urls, sender, mtype)
except Exception:
pass
self.sed_command(msg, sender, mtype)
if __name__ == "__main__":
config = configparser.ConfigParser()
config.read("config.ini")
jid = config["angel"]["jid"]
password = config["angel"]["password"]
autojoin = config["angel"]["autojoin"].split()
bot = AngelBot(jid, password, autojoin=autojoin)
bot.connect()
bot.process(forever=True)