Dalam kasus penggunaan ini, saya mencoba menambahkan dokumen ke koleksi MongoDB menggunakan pymongo yang diambil dari berbagai umpan berita RSS berdasarkan tanggal (bukan datetime), judul, dan ringkasan artikel dalam format kerangka data (tanggal menjadi indeks ke kerangka data).

Ketika saya menyimpan kerangka data ke database, mereka disimpan dengan skema _id, tanggal, judul, ringkasan yang baik-baik saja.

Jadi yang saya coba lakukan hanyalah mengunggah baris-baris itu dalam kerangka data yang belum disimpan sebagai dokumen dalam koleksi. Ada beberapa cara yang sudah saya coba:

  1. Dapatkan dokumen terakhir di database, bandingkan dengan dataframe. Buat DF baru yang mengecualikan semua baris sebelumnya + baris yang dibandingkan. Ini seharusnya berfungsi, namun, masih mengunggah sekitar 20% dari baris yang telah disimpan sebelumnya dan saya tidak tahu mengapa.

  2. Simpan seluruh kerangka data, lalu gabungkan koleksi dan hapus duplikatnya: Kedengarannya bagus secara teori namun semua contoh melakukan ini ada di JS dan bukan python, jadi saya belum bisa membuatnya berfungsi.

  3. Buat indeks unik dari judul: Sekali lagi, ini seharusnya berfungsi secara teori, tetapi saya belum membuatnya berfungsi.

Satu hal yang tidak ingin saya lakukan adalah menanyakan seluruh koleksi dan menyimpannya sebagai DF, menggabungkannya, membuang duplikatnya, menghapus koleksinya, dan membuatnya kembali dari DF baru. Itu tidak akan menjadi masalah sekarang karena saya bekerja dengan 30 atau lebih dokumen, tetapi ketika saya akan bekerja dengan banyak koleksi dan jutaan dokumen, yah.. tidak terlalu efisien sama sekali.

Adakah yang punya saran yang bisa saya lihat/riset/contoh kode?

Berikut adalah kode yang saya kerjakan sekarang:

Unduh Umpan RSS

def getSymbolNews(self, symbol):
    self.symbol = symbol
    self.dbName = 'db_' + self.symbol
    self.columnName = 'col_News'
    self.topics = ['$' + self.symbol]
    self.sa = getNews().parseNews(fn.SeekingAlpha(topics = self.topics))
    self.yfin = getNews().parseNews(fn.Yahoo(topics = self.topics))
    self.wb_news = getNews().getWebullNews(self.symbol)        
    self.df = pd.concat([self.sa, self.yfin, self.wb_news], axis = 0, ignore_index = False)
    self.df.drop_duplicates(inplace = True)
    self.df.sort_index(ascending = True, inplace = True)
    del self.symbol, self.topics, self.sa, self.yfin, self.wb_news
    getNews().uploadRecords(self.dbName, self.columnName, self.df)
    return self.df

Unggah ke Koleksi:

def uploadRecords(self, dbName, columnName, data):
    self.data = data
    self.dbName = dbName
    self.columnName = columnName
    self.data.reset_index(inplace=True)
    self.data.rename(columns={'index': 'Date'}, inplace = True)
    mongoFunctions.insertRecords(self.dbName, self.columnName, self.data)
    del self.data
    gc.collect()
    return

Fungsi PyMongo untuk mengunggah:

def insertRecords(dbName: str, collectionName: str, data: object):
    """Inserts a pandas dataframe object into a MongoDB collection (table)

    Args:
        dbName (str): Database name
        collectionName (str): Collection name
        data (object): Pandas dataframe object
    """

    collection = getCollection(dbName, collectionName)
    query = queryAllRecords(dbName, collectionName)

    if query.shape == (0, 0):
        record = data.to_dict(orient="records")
        collection.insert(record)
    else:
        query.drop(["_id"], axis=1, inplace=True)
        if query.equals(data):
            return
        else:
            df_temp = pd.concat([query, data]).drop_duplicates(keep=False)
            records = df_temp.to_dict(orient="records")
            collection.insert_many(records)

    return
0
Dave 5 April 2021, 08:19

1 menjawab

Jawaban Terbaik

Saya ingin mengambil md5 hash dari dokumen dan menyimpannya sebagai _id; maka Anda bisa menggunakan insert_many() dengan ordered=False untuk menyisipkan item apa pun yang bukan duplikat; Anda dapat menjalankan ini sesering yang Anda suka dan hanya item baru yang akan ditambahkan; ingat bahwa jika bidang ada bahkan sedikit diubah, item baru akan ditambahkan; jika ini bukan perilaku yang Anda inginkan, ubah apa yang Anda berikan ke md5().

Kode akhirnya menjadi cukup mudah:

from pymongo import MongoClient
from pymongo.errors import BulkWriteError
import feedparser
from hashlib import md5
from json import dumps

db = MongoClient()['mydatabase']

entries = feedparser.parse("http://feeds.bbci.co.uk/news/world/rss.xml")['entries']

for item in entries:
    item['_id'] = md5(dumps(item).encode("utf-8")).hexdigest()

try:
    db.news.insert_many(entries, ordered=False)
except BulkWriteError:
    pass
1
Belly Buster 5 April 2021, 10:58