Spaces:
Sleeping
Sleeping
| import vk_api | |
| from dotenv import load_dotenv | |
| import os | |
| from dataclasses import dataclass | |
| import numpy as np | |
| import streamlit as st | |
| import pandas as pd | |
| from joblib import Parallel, delayed | |
| load_dotenv() | |
| def connect_api(): | |
| service_token = os.getenv("VK_TOKEN") | |
| return vk_api.VkApi(token=service_token).get_api() | |
| vk = connect_api() | |
| def get_cities_db(): | |
| return pd.read_csv("towns.csv") | |
| cities_db = get_cities_db() | |
| class Post: | |
| text: str | |
| city_of_origin: str | |
| geolocation: tuple[float, float] | |
| # likes: int | |
| owner_id: int | |
| # group_owned: bool = False | |
| def search_posts(query: str, num_of_posts: int, *, search_args = {}) -> list[Post]: | |
| posts: list[Post] = [] | |
| offset = 0 | |
| request_count = min(num_of_posts, 200) | |
| city_none_stat = 0 | |
| pos_none_stat = 0 | |
| while len(posts) < num_of_posts: | |
| query_results = vk.newsfeed.search(q=query, count=request_count, offset=offset, **search_args) | |
| items = [item for item in query_results["items"] if "owner_id" in item and "text" in item] | |
| item_dict = {item["owner_id"]: item for item in items} | |
| # print(query_results, items, flush=True) | |
| owner_ids = np.array([item["owner_id"] for item in items]) | |
| cities = get_post_city(owner_ids) | |
| city_pos = get_city_position(cities) | |
| # likes = item.get("likes", {"count": 0})["count"] | |
| for id, pos in city_pos.items(): | |
| if cities[id] is None: | |
| city_none_stat += 1 | |
| continue | |
| if pos is None: | |
| pos_none_stat += 1 | |
| continue | |
| posts.append(Post(item_dict[id]["text"], cities[id], city_pos[id], id)) | |
| offset += request_count | |
| print(f"Processed {offset} posts, added {len(posts)}. City not found: {city_none_stat}, position not found: {pos_none_stat}.", flush=True) | |
| return posts[:num_of_posts] | |
| def search_posts_parallel(query: str, num_of_posts: int, num_of_workers: int = 4, *, search_args = {}): | |
| posts: list[Post] = [] | |
| offset = 0 | |
| request_count = min(num_of_posts // num_of_workers + 1, 200) | |
| while len(posts) < num_of_posts: | |
| search_res = Parallel(n_jobs=num_of_workers) \ | |
| ( | |
| delayed(_get_posts)(query, request_count, offset + i * request_count, search_args) for i in range(num_of_workers) | |
| ) | |
| for p in search_res: | |
| posts.extend(p) | |
| # print(*[pp.geolocation for pp in p]) | |
| offset += request_count * num_of_workers | |
| print(f"Processed {offset} posts, added {len(posts)}.", flush=True) | |
| return posts[:num_of_posts] | |
| def _get_posts(query: str, request_count: int, offset: int, search_args) -> list[Post]: | |
| posts = [] | |
| query_results = vk.newsfeed.search(q=query, count=request_count, offset=offset, **search_args) | |
| items = [item for item in query_results["items"] if "owner_id" in item and "text" in item] | |
| item_dict = {item["owner_id"]: item for item in items} | |
| # print(query_results, items, flush=True) | |
| owner_ids = np.array([item["owner_id"] for item in items]) | |
| cities = get_post_city(owner_ids) | |
| city_pos = get_city_position(cities) | |
| # likes = item.get("likes", {"count": 0})["count"] | |
| for id, pos in city_pos.items(): | |
| if pos is None: | |
| continue | |
| posts.append(Post(item_dict[id]["text"], cities[id], pos, id)) | |
| assert all(post.geolocation is not None for post in posts) | |
| return posts | |
| def get_post_city(owner_id: np.ndarray) -> dict[int, str | None]: | |
| group_ids = -owner_id[owner_id < 0] | |
| user_ids = owner_id[owner_id > 0] | |
| assert len(group_ids) + len(user_ids) == len(owner_id) | |
| # print(group_ids, user_ids, owner_id, flush=True) | |
| if len(group_ids) > 0: | |
| groups = vk.groups.getById(group_ids=list(group_ids), fields=['city', 'country']) | |
| groups_dict = {-group["id"]: group.get("city", None) for group in groups} | |
| else: | |
| groups_dict = {} | |
| if len(user_ids) > 0: | |
| users = vk.users.get(user_ids=list(user_ids), fields=['city', 'country']) | |
| users_dict = {user["id"]: user.get("city", None) for user in users} | |
| else: | |
| users_dict = {} | |
| users_dict.update(groups_dict) | |
| return {id: city["title"] if city is not None else None for id, city in users_dict.items()} | |
| def get_city_position(cities: dict[int, str | None]) -> dict[int, tuple[float, float] | None]: | |
| res = {} | |
| for id, city in cities.items(): | |
| if city is None: | |
| res[id] = None | |
| continue | |
| selected = cities_db[cities_db["city"] == city] | |
| if len(selected) == 0: | |
| res[id] = None | |
| continue | |
| # print(selected) | |
| res[id] = (selected["lat"].iloc[0], selected["lon"].iloc[0]) | |
| assert len(cities) == len(res) | |
| return res | |
| def search_posts_by_pos(query: str, num_of_posts: int, city_name: str, lat: float, lon: float) -> list[Post]: | |
| posts: list[Post] = [] | |
| offset = 0 | |
| request_count = min(num_of_posts, 200) | |
| while len(posts) < num_of_posts: | |
| query_results = vk.newsfeed.search(q=query, count=request_count, offset=offset, latitude=lat, longtitude=lon) | |
| items = [item for item in query_results["items"] if "text" in item] | |
| for item in items: | |
| posts.append(Post(item["text"], city_name, (lat, lon), item.get("owner_id", 0))) | |
| offset += request_count | |
| print(f"For city {city_name} processed {offset} posts, added {len(posts)}.", flush=True) | |
| return posts[:num_of_posts] |