A script for downloading all the data from vk.com group.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

545 lines
17 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019, Maxim Lihachev, <envrm@yandex.ru>
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import io
import os
import re
import time
import click
import vk_api
import datetime
import urllib.request
from string import Template
from progressbar import *
############################################################################
class Color:
GRAY = '\033[1;47m'
GREEN = '\033[1;32m'
YELLOW = '\033[1;33m'
CYAN = '\033[1;36m'
MAGENTA = '\033[1;35m'
RESET = '\033[0m'
def printc(color, text, end='\n'):
print("{}{}{}".format(
getattr(Color, color),
text,
Color.RESET
), end=end)
def timestamp_to_date(timestamp, fmt='%Y-%m-%d %H:%M:%S'):
'''Преобразование временного штампа в читаемую дату'''
return datetime.datetime.fromtimestamp(int(timestamp)).strftime(fmt)
############################################################################
def auth(login, password):
'''Аутентификация в vk.com'''
vk_session = vk_api.VkApi(login, password)
try:
vk_session.auth()
return vk_session.get_api()
except vk_api.AuthError as error_msg:
print(error_msg)
return
############################################################################
class Group(object):
'''Группа vk.com'''
def __init__(self, vk, url):
self.vk = vk
self.url = url
self.info = self._get_info(self._get_name(url))
self.id = self.info['id']
self.name = self.info['screen_name']
self.title = self.info['name']
self.status = self.info['status']
self.avatar = self.info['photo_100']
self.avatar_full = self._largest_photo(self.info)
if 'links' in self.info:
self.links = [l['url'] + '+@+'
+ l['name'] + ' | '
+ (l['desc'] if 'desc' in l else '') + '+@+'
+ (l['photo_50'] if 'photo_50' in l else 'NOICON') for l in self.info['links']]
else:
self.links = []
if self.info['is_admin'] != 0:
self.settings = self._get_settings(self.id)
self.description = self.settings['description']
else:
self.description = ""
self.wall = self._get_wall()
@staticmethod
def _largest_photo(photo):
'''Выбор наибольшего доступного разрешения фотографии'''
sizes = sorted([
r for r
in photo.keys()
if re.match(r'photo_.*', r)
], key=lambda x: int(x[6:]), reverse=True)
return photo[sizes[0]]
@staticmethod
def _get_name(url):
'''HTTP-имя группы'''
return url.split('/')[-1]
def _get_info(self, name):
'''Информация о сообществе
{
u'screen_name': u'havelove_willtravel',
u'name': 'name'
u'is_member': 0,
u'is_advertiser': 0,
u'is_admin': 0,
u'type': u'group',
u'id': 91087679,
u'is_closed': 0,
u'photo_50': u'https://pp.userapi.com/JGKzwLMZQ9s.jpg?ava=1',
u'photo_100': u'https://pp.userapi.com/4M20VvCkRV8.jpg?ava=1',
u'photo_200': u'https://pp.userapi.com/7fucdcg44a4.jpg?ava=1',
u'status': u'status',
u'links': [
{
u'url': u'url',
u'desc': u'envrm.info',
u'id': 84867105,
u'name': u'name',
u'edit_title': 1
}
],
}
'''
return self.vk.groups.getById(
group_ids=name,
extended=1,
fields="status,links"
)[0]
def _get_settings(self, id):
'''Информация о сообществе, доступная администраторам
{
u'wiki': 2,
u'city_id': 0,
u'topics': 0,
u'video': 2,
u'sections_list': [[0, u'section']],
u'obscene_stopwords': 0,
u'subject': 25,
u'obscene_filter': 0,
u'age_limits': 1,
u'title': u'title',
u'country_id': 0,
u'access': 0,
u'wall': 2,
u'market': {u'enabled': 0},
u'website': u'',
u'description': u'',
u'docs': 0,
u'secondary_section': 0,
u'photos': 2,
u'address': u'havelove_willtravel',
u'main_section': 0,
u'articles': 1,
u'obscene_words': [],
u'messages': 0,
u'action_button': {},
u'subject_list': [],
}
'''
return self.vk.groups.getSettings(group_id=id)
def _get_wall(self):
'''Получение записей со стены сообщества'''
global tools
id = self.id
content = tools.get_all('wall.get', 100, {'owner_id': -id})
return content
def save_info(self):
'''Сохранение информации о группе в файл'''
if len(self.links) > 1:
links = "\n- ".join(self.links)
else:
links = "".join(self.links)
data = {
'name': self.name,
'status': self.status,
'description': self.description.replace('\n', ' '),
'logo': self.avatar,
'logo_full': self.avatar_full,
'url': self.url,
'links': "\n- " + links
}
tpl = TMPL(self.template_group, data)
tpl.write(os.path.join(self.info_file))
class TMPL(object):
'''Markdown-файл из шаблона'''
def __init__(self, template, data):
with io.open(template, "r", encoding="utf-8") as filein:
src = Template(filein.read())
self.md = src.substitute(data)
def write(self, filename):
with io.open(filename, "w", encoding="utf-8") as fileout:
fileout.write(self.md)
class Album(object):
'''Альбом vk.com'''
def __init__(self, album):
self.title = album['title']
self.description = album['description']
self.thumbnail = self._largest_photo(album['thumb'])
if ' - ' in self.title:
self.tags = [x.strip() for x in re.search(".* - (.*)", self.title).group(1).split(",")]
else:
self.tags = []
album_id = album['id']
album_owner_id = album['owner_id']
self.photos = [Photo(x) for x in tools.get_all('photos.get', 100, {'owner_id': album_owner_id, 'album_id': album_id})['items']]
@staticmethod
def _largest_photo(photo):
max_size = sorted([p['width'] for p in photo['sizes']])[-1]
return list(filter(lambda p: p['width'] == max_size, photo['sizes']))[0]['url']
def archive(self, directory):
img_directory = os.path.join(directory, self.title)
if not os.path.exists(img_directory):
os.makedirs(img_directory)
urllib.request.urlretrieve(self.thumbnail, os.path.join(img_directory, "cover.jpg"))
bar_elements = [f'\t{Color.MAGENTA}Альбом. Фотографии:{Color.RESET} ', Percentage(), f' {Color.CYAN}', Bar(), f'{Color.RESET} ', ETA()]
bar = ProgressBar(widgets=bar_elements, maxval=len(self.photos)).start()
for index, photo in enumerate(self.photos, 1):
file_name = os.path.join(img_directory, str(index).zfill(3))
urllib.request.urlretrieve(photo.url, file_name + ".jpg")
if len(photo.description) > 0:
with open(file_name + ".txt", "w") as info_file:
info_file.write(photo.description)
bar.update(index)
print("")
class Photo(object):
'''Фотография vk.com
{
u'id': 456243888,
u'album_id': 259402486,
u'date': 1545675847,
u'text': u'',
u'height': 1464,
u'width': 2448,
u'user_id': 100,
u'owner_id': -91087679
u'sizes': [{
'type': 'm',
'url': 'https://sun9-27.userapi.com/c851124/v851124613/1b85ca/UacqE2Zs2tc.jpg',
'width': 130,
'height': 87
}, {
'type': 'o',
'url': 'https://sun9-9.userapi.com/c851124/v851124613/1b85cf/59MhcAHsMKg.jpg',
'width': 130,
'height': 87
}, {
'type': 'p',
'url': 'https://sun9-57.userapi.com/c851124/v851124613/1b85d0/Ue4FW1UkwFE.jpg',
'width': 200,
'height': 133
}]
'''
def __init__(self, photo):
self.description = photo['text']
self.date = photo['date']
self.thumbnail = self._get_thumbnail(photo)
self.url = self._largest_photo(photo)
@staticmethod
def _get_thumbnail(photo):
min_size = sorted([p['width'] for p in photo['sizes']])[0]
return list(filter(lambda p: p['width'] == min_size, photo['sizes']))[0]['url']
@staticmethod
def _largest_photo(photo):
max_size = sorted([p['width'] for p in photo['sizes']])[-1]
return list(filter(lambda p: p['width'] == max_size, photo['sizes']))[0]['url']
def archive(self, directory, date, num):
img_directory = os.path.join(directory, date + " - Фотографии")
if not os.path.exists(img_directory):
os.makedirs(img_directory)
file_name = os.path.join(img_directory, str(num).zfill(3))
urllib.request.urlretrieve(self.url, file_name + ".jpg")
if len(self.description) > 0:
with open(file_name + ".txt", "w") as info_file:
info_file.write(self.description)
class Link(object):
'''Ссылка vk.com'''
def __init__(self, link):
self.title = link['title']
self.description = link['description']
self.url = link['url']
if 'photo' in link:
self.thumbnail = self._get_thumbnail(link['photo'])
else:
self.thumbnail = ""
@staticmethod
def _get_thumbnail(photo):
min_size = sorted([p['width'] for p in photo['sizes']])[0]
return list(filter(lambda p: p['width'] == min_size, photo['sizes']))[0]['url']
class Post(object):
'''Запись vk.com'''
def __init__(self, wall, index):
self.post = wall['items'][index]
# Репост
if 'copy_history' in self.post:
self.post = self.post['copy_history'][0]
post_id = str(self.post['id'])
wall_id = str(self.post['owner_id'])
self.url = 'https://vk.com/wall' + wall_id + '_' + post_id
self.id = timestamp_to_date(self.post['date'], fmt='%Y-%m-%d-%H.%M.%S')
self.date = timestamp_to_date(self.post['date'])
self.title = []
self.link_title = ""
self.link_description = ""
self.link_thumbnail = ""
self.link_url = ""
self.albums = []
self.photo_num = 0
if 'signer_id' in self.post:
self.author = str(self.post['signer_id'])
else:
self.author = ""
self.text = self.post['text']
if 'attachments' in self.post:
self.attachments = self.post['attachments']
else:
self.attachments = []
self.tags = re.findall("#([^ ]+)", self.text)
global archive_directory
self.post_directory = os.path.join(archive_directory, self.id)
printc('GRAY', self.date, end='')
def _mkdir(self):
if not os.path.exists(archive_directory):
os.makedirs(archive_directory)
if not os.path.exists(self.post_directory):
os.makedirs(self.post_directory)
return True
else:
return False
def archive(self):
if self._mkdir():
print("")
if len(self.attachments) > 0:
self._save_attachments()
print("")
data = {
'layout': 'post',
'author': self.author,
'title': " / ".join(self.title) if len(self.title) > 1 else "".join(self.title),
'categories': '[' + ", ".join(sorted(set([w.capitalize() for w in self.tags]))) + ']',
'vk': self.url,
'link_title': self.link_title,
'link_description': self.link_description,
'link_thumbnail': self.link_thumbnail,
'link_url': self.link_url,
'albums': "\n- " + ("\n- ".join(self.albums) if len(self.albums) > 1 else "".join(self.albums)),
'content': self.text
}
self.md = TMPL(self.template, data)
self.md.write(os.path.join(self.post_directory, self.id + '.md'))
else:
printc('YELLOW', f" | Запись уже загружена.")
def _save_attachments(self):
for t in self.attachments:
attachment_type = t['type']
if attachment_type == 'album':
album = Album(t['album'])
if len(album.title.strip()) == 0:
album.title = "Фотографии"
printc('MAGENTA', f"\tАльбом. Название: {Color.GREEN}{album.title} {Color.MAGENTA}{album.description}")
self.title.extend(album.title.split(" - ")[1:])
self.tags.extend(album.tags)
self.albums.extend([album.title])
album.archive(self.post_directory)
elif attachment_type == 'photo':
ph = Photo(t['photo'])
self.photo_num += 1
printc('YELLOW', f"\tФотография: {str(self.photo_num).zfill(3)}.jpg | {ph.description}")
ph.archive(self.post_directory, self.id, self.photo_num)
base_album = self.id + u" - Фотографии"
if base_album not in self.albums:
self.albums.extend([base_album])
elif attachment_type == 'link':
url = Link(t['link'])
printc('YELLOW', f"\tСсылка: {url.url}")
printc('YELLOW', f"\tСсылка. Заголовок: {url.title}")
self.link_title = url.title
self.link_description = url.description
self.link_thumbnail = url.thumbnail
self.link_url = url.url
############################################################################
tools = ''
archive_directory = ''
@click.command()
@click.option('-u', '--user', help='Имя пользователя', prompt='VK User')
@click.option('-p', '--password', help='Пароль', prompt='VK Password', hide_input=True)
@click.option('-g', '--group', 'group_url', help='Сообщество для сохранения', prompt='Group URL')
@click.option('-o', '--out', 'directory', help='Директория для сохранения', default='')
def dump(user, password, group_url, directory):
# Учётная запись vk.com
global tools
global archive_directory
vk = auth(user, password)
tools = vk_api.VkTools(vk)
# Сообщество для сохранения
if 'vk.com' not in group_url:
group_url = 'http://vk.com/' + group_url
group = Group(vk, group_url)
# Директория сохранения постов
if directory == '':
# ./<group_url>
archive_directory = os.path.join(os.path.dirname(__file__), group_url.split('/')[-1])
else:
archive_directory = directory
if not os.path.exists(archive_directory):
os.makedirs(archive_directory)
# Директория сохранения изображений
photos_directory = archive_directory
# Файл шаблона информации о группе
group.template_group = os.path.abspath(os.path.join(os.path.dirname(__file__), 'tpl', 'config.yml'))
group.info_file = os.path.abspath(os.path.join(archive_directory, 'config.yml'))
header_message = f'# Сохранение сообщества {group_url} в директорию {archive_directory} #'
printc('YELLOW', '#' * len(header_message))
printc('YELLOW', header_message)
printc('YELLOW', '#' * len(header_message))
print("")
group.save_info()
# Файл шаблона поста
post_template_file = os.path.abspath(os.path.join(os.path.dirname(__file__), 'tpl', 'post.md'))
i = 0
posts = group.wall['count']
while i < posts:
printc('CYAN', "#### {:3}/{} ".format(i + 1, posts), end='')
post = Post(group.wall, i)
post.template = post_template_file
post.archive()
i += 1
if __name__ == '__main__':
# Dump vk data with Moscow timestamps
os.environ['TZ'] = 'Europe/Moscow'
time.tzset()
dump()