diff --git a/alembic/versions/cff2f4e14057_add_scrape_id_to_tables.py b/alembic/versions/cff2f4e14057_add_scrape_id_to_tables.py new file mode 100644 index 0000000..ac814c5 --- /dev/null +++ b/alembic/versions/cff2f4e14057_add_scrape_id_to_tables.py @@ -0,0 +1,34 @@ +"""add scrape_id to tables + +Revision ID: cff2f4e14057 +Revises: dbeb90576244 +Create Date: 2020-04-15 15:59:57.764377 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'cff2f4e14057' +down_revision = 'dbeb90576244' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('organisation', sa.Column('scrape_id', sa.String(), nullable=True)) + op.add_column('organisation', sa.Column('spider', sa.String(), nullable=True)) + op.add_column('organisation_links', sa.Column('scrape_id', sa.String(), nullable=True)) + op.add_column('organisation_links', sa.Column('spider', sa.String(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('organisation_links', 'spider') + op.drop_column('organisation_links', 'scrape_id') + op.drop_column('organisation', 'spider') + op.drop_column('organisation', 'scrape_id') + # ### end Alembic commands ### diff --git a/findthatcharity_import/db.py b/findthatcharity_import/db.py index a46c76c..51aa3b3 100644 --- a/findthatcharity_import/db.py +++ b/findthatcharity_import/db.py @@ -36,6 +36,8 @@ Column("organisationType", JSON), Column("organisationTypePrimary", String), Column("source", String), + Column("spider", String), + Column("scrape_id", String), ) tables["source"] = Table('source', metadata, @@ -56,6 +58,8 @@ Column('organisation_id_b', String, primary_key = True), Column('description', String), Column('source', String), + Column("spider", String), + Column("scrape_id", String), ) diff --git a/findthatcharity_import/pipelines/sqlsave_pipeline.py b/findthatcharity_import/pipelines/sqlsave_pipeline.py index e524aa3..80a47ab 100644 --- a/findthatcharity_import/pipelines/sqlsave_pipeline.py +++ b/findthatcharity_import/pipelines/sqlsave_pipeline.py @@ -7,7 +7,7 @@ from sqlalchemy import create_engine, and_ from sqlalchemy.exc import InternalError, IntegrityError from sqlalchemy.orm import sessionmaker -from sqlalchemy.sql.expression import insert +from sqlalchemy.sql.expression import insert, delete from sqlalchemy.dialects import postgresql, mysql import scrapy from scrapy import signals @@ -77,6 +77,11 @@ def commit_records(self, spider): upsert_statement = insert(table).prefix_with("OR REPLACE") for r in self.records[t]: + if "scrape_id" in cols: + r['scrape_id'] = self.crawl_id + if "spider" in cols: + r['spider'] = self.spider_name + if self.engine.name == 'postgresql': vals.append({c: r.get(c) for c in cols}) else: @@ -93,12 +98,24 @@ def open_spider(self, spider): self.spider_name = spider.name if self.db_uri: self.engine = create_engine(self.db_uri) - self.conn = self.engine.connect() + Session = sessionmaker(bind=self.engine) + self.conn = Session() self.tables = {t.name: t for t in tables.values()} self.records = {t: [] for t in self.tables} metadata.create_all(self.engine) + # delete any existing records from the tables + for t, table in self.tables.items(): + cols = [c.name for c in table.columns] + if "scrape_id" in cols and "spider" in cols: + self.conn.execute( + delete(table).where(and_( + table.c.scrape_id != self.crawl_id, + table.c.spider == self.spider_name + )) + ) + # do any tasks before the spider is run # if hasattr(spider, "name"): # if self.conn.engine.dialect.has_table(self.conn.engine, tables["organisation"].name): @@ -120,6 +137,7 @@ def open_spider(self, spider): def spider_closed(self, spider, reason): if hasattr(self, "conn"): self.commit_records(spider) + self.conn.commit() self.conn.close() def save_stats(self):