RTDB를 지속적으로 업데이트하게 되면서, key column에 대해 중복되지 않게 데이터를 업데이트하는 로직의 필요성이 생겼다.
Pandas의 to_sql 메서드의 경우 같은 이름의 SQL 테이블이 존재할 때 3가지 방식을 지원하는데, 아래와 같다.
if_exists: 'fail'일 때
테이블이 존재하면 ValueError를 일으킨다.
if_exists: 'replace'일 때
테이블이 존재하면 기존의 테이블을 없애고 새로운 테이블을 덮어씌워 저장한다.
if_exists: 'append'일 때
테이블이 존재하면 새로운 table을 새로운 행으로 추가하여 저장한다.
특정 column에 대해 중복된 행을 제외하고 데이터를 업데이트 하는 기능을 지원할 줄 알았는데, 그런 기능은 없었다.
Python으로 이와 같은 고민을 한 사람이 없나 찾아보니, 나와 다른 PostgreSQL을 사용하기는 했지만 같은 아이디어를 가진 분이 계셨다.
https://www.ryanbaumann.com/blog/2016/4/30/python-pandas-tosql-only-insert-new-rows
데이터를 UPSERT(UPDATE + INSERT)하기 위해서는 별도의 로직을 짜야 하는데, 아래 코드와 같다.
# DB에 중복되면 안되는 열들을 제외하고 새로 추가해야 할 Data들만 업데이트하기
# df: 새로 업데이트 할 Pandas DataFrame
# tablename: SQL의 테이블명
# engine: SQL connector
# dup_cols: 중복 여부를 결정할 key columns
# filter_continuous_col: where문으로 제어할 continuous columns
# filter_categorical_col: where문으로 제어할 categorical columns
def filter_new_df(df, tablename, engine, dup_cols=[],
filter_continuous_col=None, filter_categorical_col=None):
args = 'SELECT %s FROM %s' %(', '.join(['"{0}"'.format(col) for col in dup_cols]), tablename)
args_contin_filter, args_cat_filter = None, None
if filter_continuous_col is not None:
if df[filter_continuous_col].dtype == 'datetime64[ns]':
args_contin_filter = """ "%s" BETWEEN Convert(datetime, '%s')
AND Convert(datetime, '%s')""" %(filter_continuous_col,
df[filter_continuous_col].min(), df[filter_continuous_col].max())
if filter_categorical_col is not None:
args_cat_filter = ' "%s" in(%s)' %(filter_categorical_col,
', '.join(["'{0}'".format(value) for value in df[filter_categorical_col].unique()]))
if args_contin_filter and args_cat_filter:
args += ' Where ' + args_contin_filter + ' AND' + args_cat_filter
elif args_contin_filter:
args += ' Where ' + args_contin_filter
elif args_cat_filter:
args += ' Where ' + args_cat_filter
df.drop_duplicates(dup_cols, keep='last', inplace=True)
df = pd.merge(df, pd.read_sql(args, engine), how='left', on=dup_cols, indicator=True)
df = df[df['_merge'] == 'left_only']
df.drop(['_merge'], axis=1, inplace=True)
return df
아이디어는 지정한 key column들과 조건들에 대해 SQL Query문을 작성하고, Pandas의 drop_duplicates와 merge 메서드를 활용하여 중복되지 않는 차집합을 DataFrame을 반환한다는 것이다.
해당 방식으로 도출된 DataFrame은 단순히 to_sql의 append 방식으로 데이터베이스에 UPSERT하면 된다.
'Python > Database' 카테고리의 다른 글
pyodbc를 이용하여 python에서 MSSQL 연결하기 (0) | 2021.06.30 |
---|