Очень часто задаваемый здесь вопрос - как сделать апсерт, который в MySQL называется INSERT ... ON DUPLICATE UPDATE
, а в стандарте поддерживается как часть операции MERGE
.
Учитывая, что PostgreSQL не поддерживает ее напрямую (до версии 9.5), как это сделать? Рассмотрим следующее:
CREATE TABLE testtable (
id integer PRIMARY KEY,
somedata text NOT NULL
);
INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');
Теперь представьте, что вы хотите "вставить" кортежи (2, 'Joe')
, (3, 'Alan')
, так что новое содержимое таблицы будет таким:
(1, 'fred'),
(2, 'Joe'), -- Changed value of existing tuple
(3, 'Alan') -- Added new tuple
Именно об этом говорят, обсуждая апсерт
. Крайне важно, что любой подход должен быть безопасным при наличии нескольких транзакций, работающих с одной и той же таблицей - либо с помощью явной блокировки, либо иным образом защищаясь от возникающих условий гонки.
Эта тема подробно обсуждается на https://stackoverflow.com/q/1109061/398670, но там речь идет об альтернативах синтаксису MySQL, и со временем она обросла множеством несвязанных деталей. Я работаю над окончательными ответами.
Эти техники также полезны для "вставки, если не существует, иначе ничего не делать", т.е. "вставки ... по дублирующему ключу игнорировать".
INSERT ... ON CONFLICT UPDATE
(и ON CONFLICT DO NOTHING
), т.е. upsert.
Сравнение с ON DUPLICATE KEY UPDATE
.
Краткое объяснение.
Об использовании см. руководство - в частности, пункт conflict_action на синтаксической диаграмме, а также пояснительный текст.
В отличие от решений для 9.4 и старше, приведенных ниже, эта функция работает с несколькими конфликтующими строками и не требует эксклюзивной блокировки или цикла повтора.
Коммит, добавляющий эту возможность, находится здесь, а обсуждение его разработки находится здесь.PostgreSQL не имеет встроенных средств UPSERT
(или MERGE
), и сделать это эффективно в условиях одновременного использования очень сложно.
В этой статье эта проблема рассматривается достаточно подробно.
В общем случае вы должны выбирать между двумя вариантами:
Использование отдельных операций вставки в ряд в цикле повтора является разумным вариантом, если вы хотите, чтобы множество соединений одновременно пытались выполнить вставку.
В документации к PostgreSQL есть полезная процедура, которая позволит вам сделать это в цикле внутри базы данных. В отличие от большинства наивных решений, она защищает от потери обновлений и гонок вставок. Она будет работать только в режиме READ COMMITTED
и безопасна только в том случае, если это единственное, что вы делаете в транзакции. Функция не будет работать корректно, если триггеры или вторичные уникальные ключи вызывают нарушения уникальности.
Эта стратегия очень неэффективна. При любой практической возможности следует ставить работу в очередь и выполнять массовую апсерт, как описано ниже.
Многие попытки решить эту проблему не учитывают откаты, поэтому приводят к неполным обновлениям. Две транзакции гоняются друг с другом; одна из них успешно выполняет INSERT
, другая получает ошибку дублирования ключа и вместо этого выполняет UPDATE
. Блокировка UPDATE
ожидает отката или фиксации INSERT
. Когда он откатывается, повторная проверка условия UPDATE
приводит к нулевым строкам, поэтому, хотя UPDATE
фиксируется, на самом деле он не выполнил ожидаемый апсерт. Необходимо проверить количество строк в результатах и повторить попытку там, где это необходимо.
В некоторых попытках решения проблемы также не учитываются гонки SELECT. Если вы попробуете очевидное и простое решение:
-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.
BEGIN;
UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;
-- Remember, this is WRONG. Do NOT COPY IT.
INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);
COMMIT;
то при одновременном запуске двух существует несколько вариантов отказа. Один из них - это уже обсуждавшаяся проблема с повторной проверкой обновлений. Другой - когда оба UPDATE
одновременно, совпадают с нулевыми строками и продолжают. Затем оба выполняют проверку EXISTS
, которая происходит перед INSERT
. Оба получают нулевые строки, поэтому оба выполняют INSERT
. Один из них терпит неудачу с ошибкой дублирования ключа.
Вот почему вам нужен цикл повторной попытки. Вы можете подумать, что можно предотвратить ошибки дублирования ключей или потери обновлений с помощью умного SQL, но это не так. Вам нужно проверить количество строк или обработать ошибки дублирования ключей (в зависимости от выбранного подхода) и повторить попытку.
Пожалуйста, не создавайте собственных решений для этого. Как и в случае с очередью сообщений, это, скорее всего, неправильно.
Иногда вы хотите выполнить массовый апсерт, когда у вас есть новый набор данных, который вы хотите объединить со старым существующим набором данных. Это очень эффективнее, чем апсерт отдельных строк, и должно быть предпочтительнее, когда это возможно. В этом случае вы обычно следуете следующему процессу:
CREATE
таблица TEMPORARY
копирование
или массовая вставка новых данных во временную таблицуБЛОКИРОВКА
целевой таблицы в режиме EXCLUSIVE
. Это позволит другим транзакциям SELECT
, но не вносить никаких изменений в таблицу.UPDATE ... FROM
существующих записей, используя значения из временной таблицы;INSERT
строк, которые еще не существуют в целевой таблице;COMMIT
, снимая блокировку.
Например, для примера, приведенного в вопросе, используя многозначный INSERT
для заполнения временной таблицы:BEGIN;
CREATE TEMPORARY TABLE newvals(id integer, somedata text);
INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');
LOCK TABLE testtable IN EXCLUSIVE MODE;
UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;
INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;
COMMIT;
MERGE
на вики PostgreSQLMERGE
?SQL-стандарт MERGE
на самом деле имеет плохо определенную семантику параллелизма и не подходит для апсертинга без предварительной блокировки таблицы.
Это действительно полезный OLAP-оператор для объединения данных, но он не является полезным решением для безопасной для обмена данными апсерт. Существует множество советов людям, использующим другие СУБД, использовать MERGE
для апсетов, но на самом деле это неправильно.
INSERT ... ON DUPLICATE KEY UPDATE
в MySQLMERGE
из MS SQL Server (но см. выше о проблемах MERGE
)MERGE
из Oracle (но см. выше о проблемах MERGE
)Я пытаюсь предложить другое решение проблемы одиночной вставки в версиях PostgreSQL до 9.5. Идея заключается в том, чтобы попытаться сначала выполнить вставку, а в случае, если запись уже существует, обновить ее:
do $$
begin
insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
update testtable set somedata = 'Joe' where id = 2;
end $$;
Обратите внимание, что это решение может быть применено только в том случае, если нет удалений строк таблицы.
Я не знаю об эффективности этого решения, но оно кажется мне достаточно разумным.
Вот некоторые примеры для вставки ... на конфликт ...` (ПГ 9.5+) :
Вставка на конфликт - ничего не делать. `вставить в манекен(идентификатор, название, размер) значений(1, 'новое_имя', 3) о конфликте ничего не делать;
Вставка на конфликт - обновления, указать цель конфликта через колонки.
вставить в манекен(идентификатор, название, размер) значений(1, 'новое_имя', 3) на конфликт(ИД) сделать обновление установить имя = 'новое_имя', размер = 3;
Вставка на конфликт - обновления, указать цель конфликта через имя ограничения.
вставить в манекен(идентификатор, название, размер) значений(1, 'новое_имя', 3) о конфликте на dummy_pkey ограничений сделать обновление установить имя = 'новое_имя', размер = 4;
Поскольку большой пост выше охватывает множество различных подходов SQL для версий Постгреса (не только не 9.5 как в вопросе), я хотел бы добавить, как это сделать в с SQLAlchemy если вы используете базы данных Postgres 9.5. Вместо того, чтобы реализовать свои вставки, также можно использовать с SQLAlchemy'ы функции (которые были добавлены в с SQLAlchemy 1.1). Лично я рекомендовал бы использовать это, если это возможно. Не только из-за удобства, но и потому, что позволяет справиться с любой СУБД PostgreSQL условия гонки, которые могут возникнуть.
Кросс-постинг из другого ответа я вчера дал (https://stackoverflow.com/a/44395983/2156909)
С SQLAlchemy поддерживает сейчас на конфликт с on_conflict_do_update два метода ()
и on_conflict_do_nothing()
:
Копировать из документации:
from sqlalchemy.dialects.postgresql import insert
stmt = insert(my_table).values(user_email='[email protected]', data='inserted data')
stmt = stmt.on_conflict_do_update(
index_elements=[my_table.c.user_email],
index_where=my_table.c.user_email.like('%@gmail.com'),
set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)
WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS
Проверено на Postgresql 9.3
Поскольку этот вопрос была закрыта, Я'м проводки вот как вы сделать это через с SQLAlchemy. Через рекурсию, он не повторяет массовая вставка или обновление для борьбы гонки и ошибки валидации.
Первый импорт
import itertools as it
from functools import partial
from operator import itemgetter
from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts
Теперь пару вспомогательных функций
def chunk(content, chunksize=None):
"""Groups data into chunks each with (at most) `chunksize` items.
https://stackoverflow.com/a/22919323/408556
"""
if chunksize:
i = iter(content)
generator = (list(it.islice(i, chunksize)) for _ in it.count())
else:
generator = iter([content])
return it.takewhile(bool, generator)
def gen_resources(records):
"""Yields a dictionary if the record's id already exists, a row object
otherwise.
"""
ids = {item[0] for item in session.query(Posts.id)}
for record in records:
is_row = hasattr(record, 'to_dict')
if is_row and record.id in ids:
# It's a row but the id already exists, so we need to convert it
# to a dict that updates the existing record. Since it is duplicate,
# also yield True
yield record.to_dict(), True
elif is_row:
# It's a row and the id doesn't exist, so no conversion needed.
# Since it's not a duplicate, also yield False
yield record, False
elif record['id'] in ids:
# It's a dict and the id already exists, so no conversion needed.
# Since it is duplicate, also yield True
yield record, True
else:
# It's a dict and the id doesn't exist, so we need to convert it.
# Since it's not a duplicate, also yield False
yield Posts(**record), False
И, наконец, функция вставки
def upsert(data, chunksize=None):
for records in chunk(data, chunksize):
resources = gen_resources(records)
sorted_resources = sorted(resources, key=itemgetter(1))
for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
items = [g[0] for g in group]
if dupe:
_upsert = partial(session.bulk_update_mappings, Posts)
else:
_upsert = session.add_all
try:
_upsert(items)
session.commit()
except IntegrityError:
# A record was added or deleted after we checked, so retry
#
# modify accordingly by adding additional exceptions, e.g.,
# except (IntegrityError, ValidationError, ValueError)
db.session.rollback()
upsert(items)
except Exception as e:
# Some other error occurred so reduce chunksize to isolate the
# offending row(s)
db.session.rollback()
num_items = len(items)
if num_items > 1:
upsert(items, num_items // 2)
else:
print('Error adding record {}'.format(items[0]))
Здесь's, как вы его используете
>>> data = [
... {'id': 1, 'text': 'updated post1'},
... {'id': 5, 'text': 'updated post5'},
... {'id': 1000, 'text': 'new post1000'}]
...
>>> upsert(data)
Преимущество над bulk_save_objects
является то, что он может справиться с отношениями, проверка ошибок и т. д. пластины (в отличие от массовые операции).