tweepyの使い方
概要
- twitter apiのpythonラッパー
- 2022年後半からtwitter apiはv1とv2に分かれており、使い方がそれぞれ異なる
インストール
$ python3 -m pip install tweepy
ツイッターのランダムサンプリングのサンプルコード
- twitter api v1の例
import tweepy
import json
import gzip
from loguru import logger
consumer_key = "<>"
consumer_secret = "<>"
access_token = "<>"
access_token_secret = "<>"
class Printer(tweepy.Stream):
def on_status(self, status):
ser = json.dumps(status._json, ensure_ascii=False, indent=2)
id_str = status._json["id_str"]
text = status._json["text"].replace("\n", " ")
screen_name = status._json["user"]["screen_name"]
logger.info(f"{screen_name} {text}")
with gzip.open(f"tweets/{id_str}", "wt") as fp:
fp.write(ser)
stream = Printer(consumer_key, consumer_secret, access_token, access_token_secret)
stream.sample(threaded=True, languages=["ja"])
ツイッターの特定のアカウントのタイムラインを取得
import tweepy
import json
import gzip
from loguru import logger
import time
import pandas as pd
from pathlib import Path
consumer_key = ""
consumer_secret = ""
access_token = ""
access_token_secret = ""
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth, wait_on_rate_limit=True)
df = pd.read_csv("users.csv")
for target_screen_name in df["user"]:
if Path(f"user/{target_screen_name}").exists():
continue
ret = api.user_timeline(screen_name=target_screen_name, count=100)
objects = []
for object in ret:
objects.append(object._json)
ser = json.dumps(objects, indent=2, ensure_ascii=False)
with gzip.open(f"user/{target_screen_name}", "wt") as fp:
fp.write(ser)
time.sleep(10.0)
ツイッターの特定のアカウントのタイムラインをカーソルで取得
- v1のAPIなのでいつまで使えるかは不明
- 200件などの上限はないが全件は取得できない模様
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth, wait_on_rate_limit=True)
screen_name = "donki_donki" # ドン・キホーテのPRアカウント
cnt = 0
for status in tweepy.Cursor(api.user_timeline, screen_name=screen_name, tweet_mode="extended").items():
obj = status._json
id = obj["id"]
Path(f"user-timelines/{screen_name}").mkdir(exist_ok=True)
with open(f"user-timelines/{screen_name}/{id}", "w") as fp:
fp.write(json.dumps(obj, indent=2, ensure_ascii=False))
print(screen_name, cnt)
cnt += 1
ツイッターの特定のアカウントのフォロワーを取得
import tweepy
import json
import gzip
from loguru import logger
import time
consumer_key = "<>"
consumer_secret = "<>"
access_token = "<>"
access_token_secret = "<>"
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth, wait_on_rate_limit=True)
target_screen_name = "<>"
cnt = 0
def fetch(cursor=-1):
global cnt
result, (prev_cur, next_cur) = api.get_followers(screen_name=target_screen_name, count=100, cursor=cursor)
for user in result:
o = user._json
logger.info(f"""{cnt}, {o["name"]}, {o["screen_name"]}""")
screen_name = o["screen_name"]
with gzip.open(f"users/{screen_name}", "wt") as fp:
fp.write(json.dumps(o, ensure_ascii=False, indent=2))
cnt += 1
fetch(cursor=next_cur)
fetch()
ツイッターのトレンドを取得
import tweepy
import json
import gzip
from loguru import logger
import fire
import time
import itertools
import datetime
consumer_key = "<>"
consumer_secret = "<>"
access_token = "<>"
access_token_secret = "<>"
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
def run(sleep_sec=60):
def _call_api():
trends = api.get_place_trends(id=1110809) # 1110809 is Japan's WOEID
ser = json.dumps(trends, indent=2, ensure_ascii=False)
now = datetime.datetime.now().strftime("%Y-%m-%d %M:%H:%S")
with gzip.open(f"trends/{now}", "wt") as fp:
fp.write(ser)
latest_trends = ",".join([o["name"] for o in trends[0]["trends"]])
logger.info(f"{now} complete, latest trends are {latest_trends}")
for count in itertools.count(0):
_call_api()
time.sleep(sleep_sec)
if __name__ == "__main__":
fire.Fire(run)
screen_name
からプロフィールを検索
- twitter API v1の例
import json
from pathlib import Path
import time
from tqdm import tqdm
import tweepy
consumer_key = ""
consumer_secret = ""
access_token = ""
access_token_secret = ""
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
for screen_name in tqdm(df["screen_name"].sample(frac=1)):
path = Path(f"twitter-profiles/{screen_name}")
if path.exists():
continue
try:
user = api.get_user(screen_name=screen_name)
except Exception as exc:
print(exc)
continue
with path.open("w") as fp:
fp.write(json.dumps(user._json, indent=2, ensure_ascii=False))
print(screen_name)
time.sleep(1.0)
特定のプロフィールを持つユーザを検索
import tweepy
from loguru import logger
import time
import itertools
import pandas as pd
from pathlib import Path
consumer_key = ""
consumer_secret = ""
access_token = ""
access_token_secret = ""
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
def run(q, page=0):
search_results = api.search_users(q=q, page=page)
data = []
for idx, sr in enumerate(search_results):
obj = sr._json
name = obj["name"]
screen_name = obj["screen_name"]
description = obj["description"].replace("\n", " ")
logger.info(f"{idx} {name} {screen_name} {description}")
data.append((name, screen_name, description))
return data
for query in ["エンジニア", "デザイナー", "人事", "マネージャー"]:
if Path(f"users_{query}.csv").exists():
continue
data = []
for page in itertools.count(0):
data.extend(run(page=page, q=query))
if page > 15:
break
time.sleep(5.0)
df = pd.DataFrame(data)
df.columns = ["name", "screen_name", "description"]
df.drop_duplicates(subset=["screen_name"], inplace=True)
logger.info(f"df size = {df.shape}")
df.to_csv(f"users_{query}.csv", index=None)
特定のワードで検索する
▶ コード
import tweepy
from loguru import logger
import fire
import time
consumer_key = ""
consumer_secret = ""
access_token = ""
access_token_secret = ""
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
def run(max_id):
if max_id is None:
search_results = api.search_tweets(q="\"ひな祭り\"", count=100)
else:
search_results = api.search_tweets(q="\"ひな祭り\"", max_id=max_id, count=100)
for sr in search_results:
obj = sr._json
id = obj["id"]
rn = "\n"
logger.info(f'''{id}, {obj["text"].replace(rn, " ")}''')
time.sleep(5.0)
run(max_id=id)
run(None)