• home
  • about
  • 全ての投稿
  • ソフトウェア・ハードウェアの設定のまとめ
  • 分析関連のまとめ
  • ヘルスケア関連のまとめ
  • 生涯学習関連のまとめ

tweepy

date: 2021-10-19 excerpt: tweepyの使い方

tag: tweepytwitterrandom sampling


tweepyの使い方

概要

  • twitter apiのpythonラッパー
  • 2022年後半からtwitter apiはv1とv2に分かれており、使い方がそれぞれ異なる

インストール

$ python3 -m pip install tweepy

ツイッターのランダムサンプリングのサンプルコード

  • twitter api v1の例
import tweepy
import json
import gzip
from loguru import logger

consumer_key = "<>"
consumer_secret = "<>"
access_token = "<>"
access_token_secret = "<>"

class Printer(tweepy.Stream):
    def on_status(self, status):
        ser = json.dumps(status._json, ensure_ascii=False, indent=2)
        id_str = status._json["id_str"]
        text = status._json["text"].replace("\n", " ")
        screen_name = status._json["user"]["screen_name"]
        logger.info(f"{screen_name} {text}")
        with gzip.open(f"tweets/{id_str}", "wt") as fp:
            fp.write(ser)

stream = Printer(consumer_key, consumer_secret, access_token, access_token_secret)

stream.sample(threaded=True, languages=["ja"])

ツイッターの特定のアカウントのタイムラインを取得

import tweepy
import json
import gzip
from loguru import logger
import time
import pandas as pd
from pathlib import Path

consumer_key = ""
consumer_secret = ""
access_token = ""
access_token_secret = ""

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

api = tweepy.API(auth, wait_on_rate_limit=True)

df = pd.read_csv("users.csv")
for target_screen_name in df["user"]:
    if Path(f"user/{target_screen_name}").exists():
        continue
    ret = api.user_timeline(screen_name=target_screen_name, count=100)
    objects = []
    for object in ret:
        objects.append(object._json)
    ser = json.dumps(objects, indent=2, ensure_ascii=False)

    with gzip.open(f"user/{target_screen_name}", "wt") as fp:
        fp.write(ser)
    time.sleep(10.0)

ツイッターの特定のアカウントのタイムラインをカーソルで取得

  • v1のAPIなのでいつまで使えるかは不明
  • 200件などの上限はないが全件は取得できない模様
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth, wait_on_rate_limit=True)

screen_name = "donki_donki" # ドン・キホーテのPRアカウント
cnt = 0
for status in tweepy.Cursor(api.user_timeline, screen_name=screen_name, tweet_mode="extended").items():
    obj = status._json
    id = obj["id"]
    Path(f"user-timelines/{screen_name}").mkdir(exist_ok=True)
    with open(f"user-timelines/{screen_name}/{id}", "w") as fp:
        fp.write(json.dumps(obj, indent=2, ensure_ascii=False))
    print(screen_name, cnt)
    cnt += 1

ツイッターの特定のアカウントのフォロワーを取得

import tweepy
import json
import gzip
from loguru import logger
import time

consumer_key = "<>"
consumer_secret = "<>"
access_token = "<>"
access_token_secret = "<>"

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

api = tweepy.API(auth, wait_on_rate_limit=True)
target_screen_name = "<>"

cnt = 0

def fetch(cursor=-1):
    global cnt
    result, (prev_cur, next_cur) = api.get_followers(screen_name=target_screen_name, count=100, cursor=cursor)
    for user in result:
        o = user._json
        logger.info(f"""{cnt}, {o["name"]}, {o["screen_name"]}""")
        screen_name = o["screen_name"]
        with gzip.open(f"users/{screen_name}", "wt") as fp:
            fp.write(json.dumps(o, ensure_ascii=False, indent=2))
        cnt += 1
    fetch(cursor=next_cur)

fetch()

ツイッターのトレンドを取得

import tweepy
import json
import gzip
from loguru import logger
import fire
import time
import itertools
import datetime

consumer_key = "<>"
consumer_secret = "<>"
access_token = "<>"
access_token_secret = "<>"

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

api = tweepy.API(auth)

def run(sleep_sec=60):
    def _call_api():
        trends = api.get_place_trends(id=1110809) # 1110809 is Japan's WOEID
        ser = json.dumps(trends, indent=2, ensure_ascii=False)
        now = datetime.datetime.now().strftime("%Y-%m-%d %M:%H:%S")
        with gzip.open(f"trends/{now}", "wt") as fp:
            fp.write(ser)
        latest_trends = ",".join([o["name"] for o in trends[0]["trends"]])
        logger.info(f"{now} complete, latest trends are {latest_trends}")

    for count in itertools.count(0):
        _call_api()
        time.sleep(sleep_sec)

if __name__ == "__main__":
    fire.Fire(run)

screen_nameからプロフィールを検索

  • twitter API v1の例
import json
from pathlib import Path
import time
from tqdm import tqdm
import tweepy

consumer_key = ""
consumer_secret = ""
access_token = ""
access_token_secret = ""

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

for screen_name in tqdm(df["screen_name"].sample(frac=1)):
    path = Path(f"twitter-profiles/{screen_name}")
    if path.exists():
        continue
    try:
        user = api.get_user(screen_name=screen_name)
    except Exception as exc:
        print(exc)
        continue
    with path.open("w") as fp:
        fp.write(json.dumps(user._json, indent=2, ensure_ascii=False))
    print(screen_name)
    time.sleep(1.0)

特定のプロフィールを持つユーザを検索

import tweepy
from loguru import logger
import time
import itertools
import pandas as pd
from pathlib import Path

consumer_key = ""
consumer_secret = ""
access_token = ""
access_token_secret = ""

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

api = tweepy.API(auth)


def run(q, page=0):
    search_results = api.search_users(q=q, page=page)
    data = []
    for idx, sr in enumerate(search_results):
        obj = sr._json
        name = obj["name"]
        screen_name = obj["screen_name"]
        description = obj["description"].replace("\n", " ")
        logger.info(f"{idx} {name} {screen_name} {description}")
        data.append((name, screen_name, description))
    return data


for query in ["エンジニア", "デザイナー", "人事", "マネージャー"]:
    if Path(f"users_{query}.csv").exists():
        continue
    data = []
    for page in itertools.count(0):
        data.extend(run(page=page, q=query))
        if page > 15:
            break
        time.sleep(5.0)
    df = pd.DataFrame(data)
    df.columns = ["name", "screen_name", "description"]
    df.drop_duplicates(subset=["screen_name"], inplace=True)
    logger.info(f"df size = {df.shape}")
    df.to_csv(f"users_{query}.csv", index=None)

特定のワードで検索する

▶ コード
import tweepy
from loguru import logger
import fire
import time

consumer_key = ""
consumer_secret = ""
access_token = ""
access_token_secret = ""

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

api = tweepy.API(auth)


def run(max_id):
    if max_id is None:
        search_results = api.search_tweets(q="\"ひな祭り\"", count=100)
    else:
        search_results = api.search_tweets(q="\"ひな祭り\"", max_id=max_id, count=100)
    for sr in search_results:
        obj = sr._json
        id = obj["id"]
        rn = "\n"
        logger.info(f'''{id}, {obj["text"].replace(rn, " ")}''')
    time.sleep(5.0)
    run(max_id=id)

run(None)


tweepytwitterrandom sampling Share Tweet