Last active
October 11, 2022 04:34
-
-
Save ngshiheng/453141d2eb5e2d2cacdbca56568ee71e to your computer and use it in GitHub Desktop.
An example pipeline that uses bulk insert while saving scrapped item into database using SQLAlchemy.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from scrapy import Spider | |
from sqlalchemy.orm import sessionmaker | |
from example.items import ProductItem | |
from example.models import Price, Product, create_table, db_connect | |
logger = logging.getLogger(__name__) | |
class ExampleScrapyPipeline: | |
""" | |
An example pipeline that saves new products and their prices into database | |
""" | |
def __init__(self): | |
""" | |
Initializes database connection and sessionmaker | |
""" | |
engine = db_connect() | |
create_table(engine) | |
self.Session = sessionmaker(bind=engine) | |
self.products = [] | |
self.prices = [] | |
def process_item(self, item: ProductItem, spider: Spider) -> ProductItem: | |
""" | |
This method is called for every item pipeline component | |
We save each product as a dict in `self.products` list so that it later be used for bulk saving | |
""" | |
product = dict( | |
name=item['name'], | |
vendor=item['vendor'], | |
quantity=item['quantity'], | |
url=item['url'], | |
) | |
self.products.append(product) | |
self.prices.append(item['price'].amount) | |
return item | |
def close_spider(self, spider: Spider) -> None: | |
""" | |
Saving all the scraped products and prices in bulk on spider close event | |
Sadly we currently have to use `return_defaults=True` while bulk saving Product objects which greatly reduces the performance gains of the bulk operation | |
Though prices do get inserted to the DB in bulk | |
Reference: https://stackoverflow.com/questions/36386359/sqlalchemy-bulk-insert-with-one-to-one-relation | |
""" | |
session = self.Session() | |
try: | |
logger.info('Saving products in bulk operation to the database.') | |
session.bulk_insert_mappings(Product, self.products, return_defaults=True) # Set `return_defaults=True` so that PK (inserted one at a time) value is available for FK usage at another table | |
logger.info('Saving prices in bulk operation to the database.') | |
prices = [dict(price=price, product_id=product['id']) for product, price in zip(self.products, self.prices)] | |
session.bulk_insert_mappings(Price, prices) | |
session.commit() | |
except Exception as error: | |
logger.exception(error, extra=dict(spider=spider)) | |
session.rollback() | |
raise | |
finally: | |
session.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from urllib.parse import urlencode | |
from scrapy.utils.project import get_project_settings | |
logger = logging.getLogger(__name__) | |
settings = get_project_settings() | |
def get_proxy_url(url: str) -> str: | |
""" | |
We send all our requests to https://www.scraperapi.com/ API endpoint in order use their proxy servers | |
This function converts regular URL to Scaper API's proxy URL | |
""" | |
scraper_api_key = settings.get('SCRAPER_API_KEY') | |
if not scraper_api_key: | |
logger.warning('Scraper API key not set.', extra=dict(url=url)) | |
return url | |
proxied_url = 'http://api.scraperapi.com/?' + urlencode({'api_key': scraper_api_key, 'url': url}) | |
logger.info(f'Scraping using Scraper API. URL <{url}>.') | |
return proxied_url |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment