| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import base64 |
| | import os |
| | import random |
| | import re |
| | import string |
| | from logging import WARNING |
| | from random import choice, randrange, shuffle |
| | from traceback import format_exc |
| | from catbox import CatboxUploader |
| |
|
| | from pyUltroid.exceptions import DependencyMissingError |
| |
|
| | try: |
| | from aiohttp import ContentTypeError |
| | except ImportError: |
| | ContentTypeError = None |
| |
|
| | from telethon.tl import types |
| | from telethon.utils import get_display_name, get_peer_id |
| |
|
| | from .. import * |
| | from .._misc._wrappers import eor |
| |
|
| | if run_as_module: |
| | from ..dB import DEVLIST |
| | from ..dB._core import LIST |
| |
|
| | from . import some_random_headers |
| | from .helper import async_searcher |
| | from .tools import check_filename, json_parser |
| |
|
| | try: |
| | import aiohttp |
| | except ImportError: |
| | aiohttp = None |
| |
|
| | try: |
| | from PIL import Image |
| | except ImportError: |
| | Image = None |
| |
|
| | try: |
| | import cv2 |
| | except ImportError: |
| | cv2 = None |
| | try: |
| | import numpy as np |
| | except ImportError: |
| | np = None |
| |
|
| | try: |
| | from bs4 import BeautifulSoup |
| | except ImportError: |
| | BeautifulSoup = None |
| |
|
| | uploader = CatboxUploader() |
| |
|
| | async def randomchannel( |
| | tochat, channel, range1, range2, caption=None, client=ultroid_bot |
| | ): |
| | do = randrange(range1, range2) |
| | async for x in client.iter_messages(channel, add_offset=do, limit=1): |
| | caption = caption or x.text |
| | try: |
| | await client.send_message(tochat, caption, file=x.media) |
| | except BaseException: |
| | pass |
| |
|
| |
|
| | |
| |
|
| |
|
| | async def YtDataScraper(url: str): |
| | to_return = {} |
| | data = json_parser( |
| | BeautifulSoup( |
| | await async_searcher(url), |
| | "html.parser", |
| | ) |
| | .find_all("script")[41] |
| | .text[20:-1] |
| | )["contents"] |
| | _common_data = data["twoColumnWatchNextResults"]["results"]["results"]["contents"] |
| | common_data = _common_data[0]["videoPrimaryInfoRenderer"] |
| | try: |
| | description_data = _common_data[1]["videoSecondaryInfoRenderer"]["description"][ |
| | "runs" |
| | ] |
| | except (KeyError, IndexError): |
| | description_data = [{"text": "U hurrr from here"}] |
| | description = "".join( |
| | description_datum["text"] for description_datum in description_data |
| | ) |
| | to_return["title"] = common_data["title"]["runs"][0]["text"] |
| | to_return["views"] = ( |
| | common_data["viewCount"]["videoViewCountRenderer"]["shortViewCount"][ |
| | "simpleText" |
| | ] |
| | or common_data["viewCount"]["videoViewCountRenderer"]["viewCount"]["simpleText"] |
| | ) |
| | to_return["publish_date"] = common_data["dateText"]["simpleText"] |
| | to_return["likes"] = ( |
| | common_data["videoActions"]["menuRenderer"]["topLevelButtons"][0][ |
| | "toggleButtonRenderer" |
| | ]["defaultText"]["simpleText"] |
| | |
| | |
| | |
| | ) |
| | to_return["description"] = description |
| | return to_return |
| |
|
| |
|
| | |
| |
|
| |
|
| | async def google_search(query): |
| | query = query.replace(" ", "+") |
| | _base = "https://google.com" |
| | headers = { |
| | "Cache-Control": "no-cache", |
| | "Connection": "keep-alive", |
| | "User-Agent": choice(some_random_headers), |
| | } |
| | con = await async_searcher(_base + "/search?q=" + query, headers=headers) |
| | soup = BeautifulSoup(con, "html.parser") |
| | result = [] |
| | pdata = soup.find_all("a", href=re.compile("url=")) |
| | for data in pdata: |
| | if not data.find("div"): |
| | continue |
| | try: |
| | result.append( |
| | { |
| | "title": data.find("div").text, |
| | "link": data["href"].split("&url=")[1].split("&ved=")[0], |
| | "description": data.find_all("div")[-1].text, |
| | } |
| | ) |
| | except BaseException as er: |
| | LOGS.exception(er) |
| | return result |
| |
|
| |
|
| | |
| |
|
| |
|
| | async def allcmds(event, telegraph): |
| | txt = "" |
| | for z in LIST.keys(): |
| | txt += f"PLUGIN NAME: {z}\n" |
| | for zz in LIST[z]: |
| | txt += HNDLR + zz + "\n" |
| | txt += "\n\n" |
| | t = telegraph.create_page(title="Ultroid All Cmds", content=[txt]) |
| | await eor(event, f"All Ultroid Cmds : [Click Here]({t['url']})", link_preview=False) |
| |
|
| |
|
| | async def ReTrieveFile(input_file_name): |
| | if not aiohttp: |
| | raise DependencyMissingError("This function needs 'aiohttp' to be installed.") |
| | RMBG_API = udB.get_key("RMBG_API") |
| | headers = {"X-API-Key": RMBG_API} |
| | files = {"image_file": open(input_file_name, "rb").read()} |
| | async with aiohttp.ClientSession() as ses: |
| | async with ses.post( |
| | "https://api.remove.bg/v1.0/removebg", headers=headers, data=files |
| | ) as out: |
| | contentType = out.headers.get("content-type") |
| | if "image" not in contentType: |
| | return False, (await out.json()) |
| |
|
| | name = check_filename("ult-rmbg.png") |
| | with open(name, "wb") as file: |
| | file.write(await out.read()) |
| | return True, name |
| |
|
| |
|
| | |
| | |
| |
|
| |
|
| | async def unsplashsearch(query, limit=None, shuf=True): |
| | query = query.replace(" ", "-") |
| | link = "https://unsplash.com/s/photos/" + query |
| | extra = await async_searcher(link, re_content=True) |
| | res = BeautifulSoup(extra, "html.parser", from_encoding="utf-8") |
| | all_ = res.find_all("img", srcset=re.compile("images.unsplash.com/photo")) |
| | if shuf: |
| | shuffle(all_) |
| | return list(map(lambda e: e['src'], all_[:limit])) |
| |
|
| |
|
| | |
| | |
| |
|
| |
|
| | async def get_random_user_data(): |
| | base_url = "https://randomuser.me/api/" |
| | cc = await async_searcher( |
| | "https://random-data-api.com/api/business_credit_card/random_card", re_json=True |
| | ) |
| | card = ( |
| | "**CARD_ID:** " |
| | + str(cc["credit_card_number"]) |
| | + f" {cc['credit_card_expiry_date']}\n" |
| | + f"**C-ID :** {cc['id']}" |
| | ) |
| | data_ = (await async_searcher(base_url, re_json=True))["results"][0] |
| | _g = data_["gender"] |
| | gender = "🤵🏻♂" if _g == "male" else "🤵🏻♀" |
| | name = data_["name"] |
| | loc = data_["location"] |
| | dob = data_["dob"] |
| | msg = """ |
| | {} **Name:** {}.{} {} |
| | **Street:** {} {} |
| | **City:** {} |
| | **State:** {} |
| | **Country:** {} |
| | **Postal Code:** {} |
| | **Email:** {} |
| | **Phone:** {} |
| | **Card:** {} |
| | **Birthday:** {} |
| | """.format( |
| | gender, |
| | name["title"], |
| | name["first"], |
| | name["last"], |
| | loc["street"]["number"], |
| | loc["street"]["name"], |
| | loc["city"], |
| | loc["state"], |
| | loc["country"], |
| | loc["postcode"], |
| | data_["email"], |
| | data_["phone"], |
| | card, |
| | dob["date"][:10], |
| | ) |
| | pic = data_["picture"]["large"] |
| | return msg, pic |
| |
|
| |
|
| | |
| |
|
| |
|
| | async def get_synonyms_or_antonyms(word, type_of_words): |
| | if type_of_words not in ["synonyms", "antonyms"]: |
| | return "Dude! Please give a corrent type of words you want." |
| | s = await async_searcher( |
| | f"https://tuna.thesaurus.com/pageData/{word}", re_json=True |
| | ) |
| | li_1 = [ |
| | y |
| | for x in [ |
| | s["data"]["definitionData"]["definitions"][0][type_of_words], |
| | s["data"]["definitionData"]["definitions"][1][type_of_words], |
| | ] |
| | for y in x |
| | ] |
| | return [y["term"] for y in li_1] |
| |
|
| |
|
| | |
| |
|
| |
|
| | class Quotly: |
| | _API = "https://quoteampi.onrender.com/generate" |
| | _entities = { |
| | types.MessageEntityPhone: "phone_number", |
| | types.MessageEntityMention: "mention", |
| | types.MessageEntityBold: "bold", |
| | types.MessageEntityCashtag: "cashtag", |
| | types.MessageEntityStrike: "strikethrough", |
| | types.MessageEntityHashtag: "hashtag", |
| | types.MessageEntityEmail: "email", |
| | types.MessageEntityMentionName: "text_mention", |
| | types.MessageEntityUnderline: "underline", |
| | types.MessageEntityUrl: "url", |
| | types.MessageEntityTextUrl: "text_link", |
| | types.MessageEntityBotCommand: "bot_command", |
| | types.MessageEntityCode: "code", |
| | types.MessageEntityPre: "pre", |
| | types.MessageEntitySpoiler: "spoiler", |
| | } |
| |
|
| | async def _format_quote(self, event, reply=None, sender=None, type_="private"): |
| | async def telegraph(file_): |
| | file = file_ + ".png" |
| | Image.open(file_).save(file, "PNG") |
| | uri = uploader.upload_file(file) |
| | os.remove(file) |
| | os.remove(file_) |
| | return uri |
| |
|
| | if reply and reply.raw_text: |
| | reply = { |
| | "name": get_display_name(reply.sender) or "Deleted Account", |
| | "text": reply.raw_text, |
| | "chatId": reply.chat_id, |
| | } |
| | else: |
| | reply = {} |
| | is_fwd = event.fwd_from |
| | name = None |
| | last_name = None |
| | if sender and sender.id not in DEVLIST: |
| | id_ = get_peer_id(sender) |
| | elif not is_fwd: |
| | id_ = event.sender_id |
| | sender = await event.get_sender() |
| | else: |
| | id_, sender = None, None |
| | name = is_fwd.from_name |
| | if is_fwd.from_id: |
| | id_ = get_peer_id(is_fwd.from_id) |
| | try: |
| | sender = await event.client.get_entity(id_) |
| | except ValueError: |
| | pass |
| | if sender: |
| | name = get_display_name(sender) |
| | if hasattr(sender, "last_name"): |
| | last_name = sender.last_name |
| | entities = [] |
| | if event.entities: |
| | for entity in event.entities: |
| | if type(entity) in self._entities: |
| | enti_ = entity.to_dict() |
| | del enti_["_"] |
| | enti_["type"] = self._entities[type(entity)] |
| | entities.append(enti_) |
| | text = event.raw_text |
| | if isinstance(event, types.MessageService): |
| | if isinstance(event.action, types.MessageActionGameScore): |
| | text = f"scored {event.action.score}" |
| | rep = await event.get_reply_message() |
| | if rep and rep.game: |
| | text += f" in {rep.game.title}" |
| | elif isinstance(event.action, types.MessageActionPinMessage): |
| | text = "pinned a message." |
| | |
| | message = { |
| | "entities": entities, |
| | "chatId": id_, |
| | "avatar": True, |
| | "from": { |
| | "id": id_, |
| | "first_name": (name or (sender.first_name if sender else None)) |
| | or "Deleted Account", |
| | "last_name": last_name, |
| | "username": sender.username if sender else None, |
| | "language_code": "en", |
| | "title": name, |
| | "name": name or "Deleted Account", |
| | "type": type_, |
| | }, |
| | "text": text, |
| | "replyMessage": reply, |
| | } |
| | if event.document and event.document.thumbs: |
| | file_ = await event.download_media(thumb=-1) |
| | uri = await telegraph(file_) |
| | message["media"] = {"url": uri} |
| |
|
| | return message |
| |
|
| | async def create_quotly( |
| | self, |
| | event, |
| | url="https://bot.lyo.su/quote/generate", |
| | reply={}, |
| | bg=None, |
| | sender=None, |
| | file_name="quote.webp", |
| | ): |
| | """Create quotely's quote.""" |
| | if not isinstance(event, list): |
| | event = [event] |
| | from .. import udB |
| |
|
| | if udB.get_key("OQAPI"): |
| | url = Quotly._API |
| | if not bg: |
| | bg = "#1b1429" |
| | content = { |
| | "type": "quote", |
| | "format": "webp", |
| | "backgroundColor": bg, |
| | "width": 512, |
| | "height": 768, |
| | "scale": 2, |
| | "messages": [ |
| | await self._format_quote(message, reply=reply, sender=sender) |
| | for message in event |
| | ], |
| | } |
| | try: |
| | request = await async_searcher(url, post=True, json=content, re_json=True) |
| | except ContentTypeError as er: |
| | if url != self._API: |
| | return await self.create_quotly( |
| | event, |
| | url=self._API, |
| | bg=bg, |
| | sender=sender, |
| | reply=reply, |
| | file_name=file_name, |
| | ) |
| | raise er |
| | if request.get("ok"): |
| | with open(file_name, "wb") as file: |
| | image = base64.decodebytes(request["result"]["image"].encode("utf-8")) |
| | file.write(image) |
| | return file_name |
| | raise Exception(str(request)) |
| |
|
| |
|
| | def split_list(List, index): |
| | new_ = [] |
| | while List: |
| | new_.extend([List[:index]]) |
| | List = List[index:] |
| | return new_ |
| |
|
| |
|
| | |
| |
|
| |
|
| | def rotate_image(image, angle): |
| | if not cv2: |
| | raise DependencyMissingError("This function needs 'cv2' to be installed!") |
| | image_center = tuple(np.array(image.shape[1::-1]) / 2) |
| | rot_mat = cv2.getRotationMatrix2D(image_center, angle, 1.0) |
| | return cv2.warpAffine(image, rot_mat, image.shape[1::-1], flags=cv2.INTER_LINEAR) |
| |
|
| |
|
| | def random_string(length=3): |
| | """Generate random string of 'n' Length""" |
| | return "".join(random.choices(string.ascii_uppercase, k=length)) |
| |
|
| |
|
| | setattr(random, "random_string", random_string) |
| |
|