File size: 5,541 Bytes
538569e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import vk_api
from dotenv import load_dotenv
import os
from dataclasses import dataclass
import numpy as np
import streamlit as st
import pandas as pd
from joblib import Parallel, delayed

load_dotenv()

@st.cache_resource
def connect_api():
    service_token = os.getenv("VK_TOKEN")
    return vk_api.VkApi(token=service_token).get_api()

vk = connect_api()

@st.cache_resource
def get_cities_db():
    return pd.read_csv("towns.csv")

cities_db = get_cities_db()


@dataclass
class Post:
    text: str
    city_of_origin: str
    geolocation: tuple[float, float]
    # likes: int
    owner_id: int
    # group_owned: bool = False

def search_posts(query: str, num_of_posts: int, *, search_args = {}) -> list[Post]:
    posts: list[Post] = []
    offset = 0
    request_count = min(num_of_posts, 200)
    city_none_stat = 0
    pos_none_stat = 0
    while len(posts) < num_of_posts:
        query_results = vk.newsfeed.search(q=query, count=request_count, offset=offset, **search_args)
        items = [item for item in query_results["items"] if "owner_id" in item and "text" in item]
        item_dict = {item["owner_id"]: item for item in items}
        # print(query_results, items, flush=True)
        owner_ids = np.array([item["owner_id"] for item in items])
        cities = get_post_city(owner_ids)
        city_pos = get_city_position(cities)
        # likes = item.get("likes", {"count": 0})["count"]
        for id, pos in city_pos.items():
            if cities[id] is None:
                city_none_stat += 1
                continue
            if pos is None:
                pos_none_stat += 1
                continue
            posts.append(Post(item_dict[id]["text"], cities[id], city_pos[id], id))
        offset += request_count
        print(f"Processed {offset} posts, added {len(posts)}. City not found: {city_none_stat}, position not found: {pos_none_stat}.", flush=True)
    return posts[:num_of_posts]


def search_posts_parallel(query: str, num_of_posts: int, num_of_workers: int = 4, *, search_args = {}):
    posts: list[Post] = []
    offset = 0
    request_count = min(num_of_posts // num_of_workers + 1, 200)
    while len(posts) < num_of_posts:
        search_res = Parallel(n_jobs=num_of_workers) \
        (
            delayed(_get_posts)(query, request_count, offset + i * request_count, search_args) for i in range(num_of_workers)
        )
        for p in search_res:
            posts.extend(p)
            # print(*[pp.geolocation for pp in p])
        offset += request_count * num_of_workers
        print(f"Processed {offset} posts, added {len(posts)}.", flush=True)
    return posts[:num_of_posts]


def _get_posts(query: str, request_count: int, offset: int, search_args) -> list[Post]:
    posts = []
    query_results = vk.newsfeed.search(q=query, count=request_count, offset=offset, **search_args)
    items = [item for item in query_results["items"] if "owner_id" in item and "text" in item]
    item_dict = {item["owner_id"]: item for item in items}
    # print(query_results, items, flush=True)
    owner_ids = np.array([item["owner_id"] for item in items])
    cities = get_post_city(owner_ids)
    city_pos = get_city_position(cities)
    # likes = item.get("likes", {"count": 0})["count"]
    for id, pos in city_pos.items():
        if pos is None:
            continue
        posts.append(Post(item_dict[id]["text"], cities[id], pos, id))
    assert all(post.geolocation is not None for post in posts)
    return posts


def get_post_city(owner_id: np.ndarray) -> dict[int, str | None]:
    group_ids = -owner_id[owner_id < 0]
    user_ids = owner_id[owner_id > 0]
    assert len(group_ids) + len(user_ids) == len(owner_id)
    # print(group_ids, user_ids, owner_id, flush=True)
    if len(group_ids) > 0:
        groups = vk.groups.getById(group_ids=list(group_ids), fields=['city', 'country'])
        groups_dict = {-group["id"]: group.get("city", None) for group in groups}
    else:
        groups_dict = {}
    if len(user_ids) > 0:
        users = vk.users.get(user_ids=list(user_ids), fields=['city', 'country'])
        users_dict = {user["id"]: user.get("city", None) for user in users}
    else:
        users_dict = {}
    
    users_dict.update(groups_dict)
    return {id: city["title"] if city is not None else None for id, city in users_dict.items()}
    

def get_city_position(cities: dict[int, str | None]) -> dict[int, tuple[float, float] | None]:
    res = {}
    for id, city in cities.items():
        if city is None:
            res[id] = None
            continue
        selected = cities_db[cities_db["city"] == city]
        if len(selected) == 0:
            res[id] = None
            continue
        # print(selected)
        res[id] = (selected["lat"].iloc[0], selected["lon"].iloc[0])
    assert len(cities) == len(res)
    return res


def search_posts_by_pos(query: str, num_of_posts: int, city_name: str, lat: float, lon: float) -> list[Post]:
    posts: list[Post] = []
    offset = 0
    request_count = min(num_of_posts, 200)
    while len(posts) < num_of_posts:
        query_results = vk.newsfeed.search(q=query, count=request_count, offset=offset, latitude=lat, longtitude=lon)
        items = [item for item in query_results["items"] if "text" in item]
        for item in items:
            posts.append(Post(item["text"], city_name, (lat, lon), item.get("owner_id", 0)))
        offset += request_count
        print(f"For city {city_name} processed {offset} posts, added {len(posts)}.", flush=True)
    return posts[:num_of_posts]