Spaces:
Sleeping
Sleeping
File size: 5,541 Bytes
538569e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | import vk_api
from dotenv import load_dotenv
import os
from dataclasses import dataclass
import numpy as np
import streamlit as st
import pandas as pd
from joblib import Parallel, delayed
load_dotenv()
@st.cache_resource
def connect_api():
service_token = os.getenv("VK_TOKEN")
return vk_api.VkApi(token=service_token).get_api()
vk = connect_api()
@st.cache_resource
def get_cities_db():
return pd.read_csv("towns.csv")
cities_db = get_cities_db()
@dataclass
class Post:
text: str
city_of_origin: str
geolocation: tuple[float, float]
# likes: int
owner_id: int
# group_owned: bool = False
def search_posts(query: str, num_of_posts: int, *, search_args = {}) -> list[Post]:
posts: list[Post] = []
offset = 0
request_count = min(num_of_posts, 200)
city_none_stat = 0
pos_none_stat = 0
while len(posts) < num_of_posts:
query_results = vk.newsfeed.search(q=query, count=request_count, offset=offset, **search_args)
items = [item for item in query_results["items"] if "owner_id" in item and "text" in item]
item_dict = {item["owner_id"]: item for item in items}
# print(query_results, items, flush=True)
owner_ids = np.array([item["owner_id"] for item in items])
cities = get_post_city(owner_ids)
city_pos = get_city_position(cities)
# likes = item.get("likes", {"count": 0})["count"]
for id, pos in city_pos.items():
if cities[id] is None:
city_none_stat += 1
continue
if pos is None:
pos_none_stat += 1
continue
posts.append(Post(item_dict[id]["text"], cities[id], city_pos[id], id))
offset += request_count
print(f"Processed {offset} posts, added {len(posts)}. City not found: {city_none_stat}, position not found: {pos_none_stat}.", flush=True)
return posts[:num_of_posts]
def search_posts_parallel(query: str, num_of_posts: int, num_of_workers: int = 4, *, search_args = {}):
posts: list[Post] = []
offset = 0
request_count = min(num_of_posts // num_of_workers + 1, 200)
while len(posts) < num_of_posts:
search_res = Parallel(n_jobs=num_of_workers) \
(
delayed(_get_posts)(query, request_count, offset + i * request_count, search_args) for i in range(num_of_workers)
)
for p in search_res:
posts.extend(p)
# print(*[pp.geolocation for pp in p])
offset += request_count * num_of_workers
print(f"Processed {offset} posts, added {len(posts)}.", flush=True)
return posts[:num_of_posts]
def _get_posts(query: str, request_count: int, offset: int, search_args) -> list[Post]:
posts = []
query_results = vk.newsfeed.search(q=query, count=request_count, offset=offset, **search_args)
items = [item for item in query_results["items"] if "owner_id" in item and "text" in item]
item_dict = {item["owner_id"]: item for item in items}
# print(query_results, items, flush=True)
owner_ids = np.array([item["owner_id"] for item in items])
cities = get_post_city(owner_ids)
city_pos = get_city_position(cities)
# likes = item.get("likes", {"count": 0})["count"]
for id, pos in city_pos.items():
if pos is None:
continue
posts.append(Post(item_dict[id]["text"], cities[id], pos, id))
assert all(post.geolocation is not None for post in posts)
return posts
def get_post_city(owner_id: np.ndarray) -> dict[int, str | None]:
group_ids = -owner_id[owner_id < 0]
user_ids = owner_id[owner_id > 0]
assert len(group_ids) + len(user_ids) == len(owner_id)
# print(group_ids, user_ids, owner_id, flush=True)
if len(group_ids) > 0:
groups = vk.groups.getById(group_ids=list(group_ids), fields=['city', 'country'])
groups_dict = {-group["id"]: group.get("city", None) for group in groups}
else:
groups_dict = {}
if len(user_ids) > 0:
users = vk.users.get(user_ids=list(user_ids), fields=['city', 'country'])
users_dict = {user["id"]: user.get("city", None) for user in users}
else:
users_dict = {}
users_dict.update(groups_dict)
return {id: city["title"] if city is not None else None for id, city in users_dict.items()}
def get_city_position(cities: dict[int, str | None]) -> dict[int, tuple[float, float] | None]:
res = {}
for id, city in cities.items():
if city is None:
res[id] = None
continue
selected = cities_db[cities_db["city"] == city]
if len(selected) == 0:
res[id] = None
continue
# print(selected)
res[id] = (selected["lat"].iloc[0], selected["lon"].iloc[0])
assert len(cities) == len(res)
return res
def search_posts_by_pos(query: str, num_of_posts: int, city_name: str, lat: float, lon: float) -> list[Post]:
posts: list[Post] = []
offset = 0
request_count = min(num_of_posts, 200)
while len(posts) < num_of_posts:
query_results = vk.newsfeed.search(q=query, count=request_count, offset=offset, latitude=lat, longtitude=lon)
items = [item for item in query_results["items"] if "text" in item]
for item in items:
posts.append(Post(item["text"], city_name, (lat, lon), item.get("owner_id", 0)))
offset += request_count
print(f"For city {city_name} processed {offset} posts, added {len(posts)}.", flush=True)
return posts[:num_of_posts] |