Spring Data Jpa如何实现批量插入或更新
更新时间:2024年12月09日 11:52:38 作者:点滴1993
文章总结:本文分享了四种Spring Data JPA批量插入或更新的方法,包括BatchConsumer、QueryParameterBuilder、KeyValue和SqlUtil,旨在为开发者提供实用的参考
Spring Data Jpa批量插入或更新
1. BatchConsumer
package com.demo.common.hibernate.batch; import com.demo.common.hibernate.querydsl.QueryParameterBuilder; /** * 批量数据消费者接口,用于设置 SQL 参数并执行操作。 * * @param <T> 记录类型的泛型 * @author xm.z */ @FunctionalInterface public interface BatchConsumer<T> { /** * 设置 SQL 参数并执行操作。 * * @param builder 参数构建对象 * @param record 要处理的记录 */ void accept(QueryParameterBuilder builder, T record); }
2. QueryParameterBuilder
package com.demo.common.hibernate.querydsl; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.hibernate.jpa.TypedParameterValue; import org.hibernate.type.*; import org.springframework.util.Assert; import javax.persistence.Query; import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Date; import java.util.concurrent.atomic.AtomicInteger; /** * QueryParameterBuilder * <p> * A utility class for building parameters for query. * * @author xm.z */ @Slf4j @Getter public class QueryParameterBuilder { /** * The native query object to be used for parameter setting */ private final Query nativeQuery; /** * The counter for parameter position */ @Getter(value = AccessLevel.NONE) private final AtomicInteger position; /** * The current date and time when the QueryParameterBuilder instance is created */ private final LocalDateTime now; /** * Private constructor to initialize QueryParameterBuilder */ private QueryParameterBuilder(Query nativeQuery, AtomicInteger position) { this.nativeQuery = nativeQuery; this.position = position; this.now = LocalDateTime.now(); } /** * Retrieves the current position of the parameter. * * @return The current position of the parameter. */ public Integer obtainCurrentPosition() { return position.get(); } /** * Create an instance of QueryParameterBuilder. * * @param nativeQuery The native query object * @param position The parameter position counter * @return QueryParameterBuilder instance */ public static QueryParameterBuilder create(Query nativeQuery, AtomicInteger position) { Assert.notNull(nativeQuery, "Native query must not be null"); Assert.notNull(position, "Position must not be null"); return new QueryParameterBuilder(nativeQuery, position); } /** * Set a parameter of type Long. * * @param value The Long value for the parameter * @return The current QueryParameterBuilder instance */ public QueryParameterBuilder setParameter(Long value) { return this.setParameter(StandardBasicTypes.LONG, value); } /** * Set a parameter of type Integer. * * @param value The Integer value for the parameter * @return The current QueryParameterBuilder instance */ public QueryParameterBuilder setParameter(Integer value) { return this.setParameter(StandardBasicTypes.INTEGER, value); } /** * Set a parameter of type BigDecimal. * * @param value The BigDecimal value for the parameter * @return The current QueryParameterBuilder instance */ public QueryParameterBuilder setParameter(BigDecimal value) { return this.setParameter(StandardBasicTypes.BIG_DECIMAL, value); } /** * Set a parameter of type String. * * @param value The String value for the parameter * @return The current QueryParameterBuilder instance */ public QueryParameterBuilder setParameter(String value) { return this.setParameter(StandardBasicTypes.STRING, value); } /** * Set a parameter of type Boolean. * * @param value The Boolean value for the parameter * @return The current QueryParameterBuilder instance */ public QueryParameterBuilder setParameter(Boolean value) { return this.setParameter(StandardBasicTypes.BOOLEAN, value); } /** * Set a parameter of type Date. * * @param value The Date value for the parameter * @return The current QueryParameterBuilder instance */ public QueryParameterBuilder setParameter(Date value) { return this.setParameter(StandardBasicTypes.DATE, value); } /** * Set a parameter of type LocalDate. * * @param value The LocalDate value for the parameter * @return The current QueryParameterBuilder instance */ public QueryParameterBuilder setParameter(LocalDate value) { return this.setParameter(LocalDateType.INSTANCE, value); } /** * Set a parameter of type LocalTime. * * @param value The LocalTime value for the parameter * @return The current QueryParameterBuilder instance */ public QueryParameterBuilder setParameter(LocalTime value) { return this.setParameter(LocalTimeType.INSTANCE, value); } /** * Set a parameter of type LocalDateTime. * * @param value The LocalDateTime value for the parameter * @return The current QueryParameterBuilder instance */ public QueryParameterBuilder setParameter(LocalDateTime value) { return this.setParameter(LocalDateTimeType.INSTANCE, value); } /** * Add or include a query condition to the native query object and set the parameter value. * * @param type The parameter type * @param value The parameter value * @return The current QueryParameterBuilder instance */ public QueryParameterBuilder setParameter(Type type, Object value) { return this.setParameter(position.getAndIncrement(), type, value); } /** * Add or include a query condition to the native query object and set the parameter value at the specified position. * * @param position The position of the parameter in the query * @param type The parameter type * @param value The parameter value * @return The current QueryParameterBuilder instance */ public QueryParameterBuilder setParameter(int position, Type type, Object value) { TypedParameterValue typedParameterValue = new TypedParameterValue(type, value); if (log.isDebugEnabled()) { log.debug("Setting parameter at position {}: {}", position, typedParameterValue); } nativeQuery.setParameter(position, typedParameterValue); return this; } }
3. KeyValue
package com.demo.common.model; import io.swagger.v3.oas.annotations.media.Schema; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; /** * 用于表示键值对的通用类 * * @param <K> 键的类型 * @param <V> 值的类型 * @author xm.z */ @Data @NoArgsConstructor @AllArgsConstructor public class KeyValue<K, V> implements Serializable { private static final long serialVersionUID = 1L; /** * 键 */ @Schema(title = "键") private K key; /** * 值 */ @Schema(title = "值") private V value; }
4. SqlUtil
package com.demo.common.hibernate.util; import com.demo.common.hibernate.batch.BatchConsumer; import com.demo.common.hibernate.querydsl.QueryParameterBuilder; import com.demo.common.model.KeyValue; import cn.hutool.extra.spring.SpringUtil; import cn.hutool.core.collection.CollUtil; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import javax.persistence.EntityManager; import javax.persistence.Query; import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** * SqlUtil * * @author xm.z */ @Slf4j @SuppressWarnings("all") public class SqlUtil { /** * Default batch insert size. */ public static final int DEFAULT_BATCH_SIZE = 100; /** * Private constructor. */ private SqlUtil() { } /** * Batch insert records into the database. * * @param tableFields The table fields information * @param records The list of records to be inserted * @param consumer The consumer function interface for customizing the insert behavior * @param <T> The type of records * @return The number of records successfully inserted */ public static <T> int batchInsert(@NonNull KeyValue<String, LinkedHashSet<String>> tableFields, @NonNull List<T> records, @NonNull BatchConsumer<? super T> consumer) { return batchInsert(DEFAULT_BATCH_SIZE, tableFields, records, consumer); } /** * Perform batch insert operation with the specified batch size. * * @param batchSize the size of each batch for insertion * @param tableFields the key-value pair representing the table fields * @param records the list of records to be inserted * @param consumer the batch consumer for processing each batch of records * @param <T> the type of records * @return the total number of records successfully inserted */ public static <T> int batchInsert(int batchSize, @NonNull KeyValue<String, LinkedHashSet<String>> tableFields, @NonNull List<T> records, @NonNull BatchConsumer<? super T> consumer) { EntityManager entityManager = SpringUtil.getBean(EntityManager.class); return batchExecuteUpdate(batchSize, entityManager, tableFields, null, records, consumer); } /** * Batch insert records into the database. * * @param entityManager The entity manager * @param tableFields The table fields information * @param records The list of records to be inserted * @param consumer The consumer function interface for customizing the insert behavior * @param <T> The type of records * @return The number of records successfully inserted */ public static <T> int batchInsert(EntityManager entityManager, @NonNull KeyValue<String, LinkedHashSet<String>> tableFields, @NonNull List<T> records, @NonNull BatchConsumer<? super T> consumer) { return batchExecuteUpdate(DEFAULT_BATCH_SIZE, entityManager, tableFields, null, records, consumer); } /** * Executes batch insert or update operations on the database using native SQL with a default batch size. * * @param tableFields key-value pair representing the table name and its fields * @param updateFields set of fields to be updated if a record with matching primary key exists * @param records the list of records to be inserted or updated * @param consumer functional interface for accepting batch consumer operations * @param <T> the type of the records to be inserted or updated * @return the total number of rows affected by the batch operation */ public static <T> int batchInsertOrUpdate(@NonNull KeyValue<String, LinkedHashSet<String>> tableFields, @NonNull LinkedHashSet<String> updateFields, @NonNull List<T> records, @NonNull BatchConsumer<? super T> consumer) { return batchInsertOrUpdate(DEFAULT_BATCH_SIZE, tableFields, updateFields, records, consumer); } /** * Executes batch insert or update operations on the database using native SQL with a parameterized batch size. * * @param batchSize the size of each batch for insertion * @param tableFields key-value pair representing the table name and its fields * @param updateFields set of fields to be updated if a record with matching primary key exists * @param records the list of records to be inserted or updated * @param consumer functional interface for accepting batch consumer operations * @param <T> the type of the records to be inserted or updated * @return the total number of rows affected by the batch operation */ public static <T> int batchInsertOrUpdate(int batchSize, @NonNull KeyValue<String, LinkedHashSet<String>> tableFields, @NonNull LinkedHashSet<String> updateFields, @NonNull List<T> records, @NonNull BatchConsumer<? super T> consumer) { EntityManager entityManager = SpringUtil.getBean(EntityManager.class); return batchExecuteUpdate(batchSize, entityManager, tableFields, updateFields, records, consumer); } /** * Executes batch insert or update operations on the database using native SQL with a default batch size. * * @param entityManager The entity manager * @param tableFields key-value pair representing the table name and its fields * @param updateFields set of fields to be updated if a record with matching primary key exists * @param records the list of records to be inserted or updated * @param consumer functional interface for accepting batch consumer operations * @param <T> the type of the records to be inserted or updated * @return the total number of rows affected by the batch operation */ public static <T> int batchInsertOrUpdate(EntityManager entityManager, @NonNull KeyValue<String, LinkedHashSet<String>> tableFields, @NonNull LinkedHashSet<String> updateFields, @NonNull List<T> records, @NonNull BatchConsumer<? super T> consumer) { return batchExecuteUpdate(DEFAULT_BATCH_SIZE, entityManager, tableFields, updateFields, records, consumer); } /** * Executes batch updates on the database using native SQL with a parameterized batch size. * * @param batchSize the size of each batch for inserting records * @param entityManager the entity manager for creating and executing queries * @param tableFields key-value pair representing the table name and its fields * @param updateFields set of fields to be updated if a record with matching primary key exists (optional) * @param records the list of records to be inserted * @param consumer functional interface for accepting batch consumer operations * @param <T> the type of the records to be inserted * @return the total number of rows affected by the batch operation */ private static <T> int batchExecuteUpdate(int batchSize, EntityManager entityManager, @NonNull KeyValue<String, LinkedHashSet<String>> tableFields, @Nullable LinkedHashSet<String> updateFields, @NonNull List<T> records, @NonNull BatchConsumer<? super T> consumer) { if (records.isEmpty()) { log.debug("No records to process. The records list is empty."); return 0; } Assert.notNull(entityManager, "The entity manager must not be null."); Assert.isTrue(batchSize > 0 && batchSize < 500, "The batch size must be between 1 and 500."); AtomicInteger totalRows = new AtomicInteger(0); // Split the records into batches based on the specified batch size List<List<T>> recordBatches = CollUtil.split(records, batchSize); for (List<T> batchRecords : recordBatches) { AtomicInteger position = new AtomicInteger(1); // Generate the appropriate SQL statement for the batch String preparedStatementSql = CollUtil.isEmpty(updateFields) ? generateBatchInsertSql(tableFields, batchRecords.size()) : generateBatchInsertOrUpdateSql(tableFields, updateFields, batchRecords.size()); // Create a Query instance for executing native SQL statements Query nativeQuery = entityManager.createNativeQuery(preparedStatementSql); // Create a parameter builder instance using QueryParameterBuilder QueryParameterBuilder parameterBuilder = QueryParameterBuilder.create(nativeQuery, position); for (T record : batchRecords) { // Set parameters for the prepared statement consumer.accept(parameterBuilder, record); } // Execute the SQL statement and accumulate the affected rows totalRows.addAndGet(nativeQuery.executeUpdate()); } // Return the total number of affected rows return totalRows.get(); } /** * Generate batch insert SQL statement. * * <p> * This method generates an SQL statement for batch insertion into a specified table with the provided fields. * Example SQL statement: * <pre> * {@code INSERT INTO TABLE_NAME ( field_1, field_2 ) VALUES ( value_1, value_2 ), (value_3, value_4); } * </pre> * </p> * * @param tableFields The key-value pair representing the table name and its associated field set * @param batchSize The batch size for insertion * @return The batch insert SQL statement */ private static String generateBatchInsertSql(@NonNull KeyValue<String, LinkedHashSet<String>> tableFields, int batchSize) { String preparedStatementSql = generateInsertStatement(tableFields.getKey(), tableFields.getValue(), batchSize); if (log.isDebugEnabled()) { log.debug("[Batch Insert] Prepared {} records SQL: {}", batchSize, preparedStatementSql); } return preparedStatementSql; } /** * Generates SQL statement for batch insert with on duplicate key update. * * @param tableFields Key-value pair representing table name and its corresponding fields. * @param updateFields Fields to be updated in case of duplicate key. * @param batchSize Number of records to be inserted in a single batch. * @return SQL statement for batch insert with on duplicate key update. * @throws IllegalArgumentException if updateFields collection is empty. */ private static String generateBatchInsertOrUpdateSql(@NonNull KeyValue<String, LinkedHashSet<String>> tableFields, LinkedHashSet<String> updateFields, int batchSize) { Assert.notEmpty(updateFields, "Update field collection cannot be empty."); // Generate the insert statement String insertStatement = generateInsertStatement(tableFields.getKey(), tableFields.getValue(), batchSize); // Initialize StringBuilder with initial capacity StringBuilder builder = new StringBuilder(insertStatement.length() + 100); // Append insert statement builder.append(insertStatement).append(" ON DUPLICATE KEY UPDATE "); // Append update clause String updateClause = updateFields.stream() .map(updateField -> updateField + " = VALUES(" + updateField + ")") .collect(Collectors.joining(", ")); builder.append(updateClause); String preparedStatementSql = builder.toString(); if (log.isDebugEnabled()) { log.debug("[Batch Insert On Duplicate Key Update] Prepared {} records SQL: {}", batchSize, preparedStatementSql); } return preparedStatementSql; } @NotNull private static String generateInsertStatement(@NonNull String tableName, @NonNull LinkedHashSet<String> fields, int batchSize) { Assert.hasText(tableName, "Table name cannot be empty."); Assert.notNull(fields, "Field collection cannot be empty."); // Set a reasonable initial capacity StringBuilder builder = new StringBuilder(fields.size() * 100); // Concatenate field names String fieldNames = String.join(", ", fields); String intoTemplate = String.format("INSERT INTO %s (%s) VALUES ", tableName, fieldNames); // Generate placeholders String placeholders = "(" + String.join(", ", Collections.nCopies(fields.size(), "?")) + ")"; // Construct the insert statement builder.append(intoTemplate); for (int i = 0; i < batchSize; i++) { if (i > 0) { builder.append(", "); } builder.append(placeholders); } return builder.toString(); } }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
kafka 重新分配partition和调整replica的数量实现
当需要提升Kafka集群的性能和负载均衡时,可通过kafka-reassign-partitions.sh命令手动重新分配Partition,增加节点后,可以将Topic的Partition的Leader节点均匀分布,以提高写入和消费速度,感兴趣的可以了解一下2022-03-03SpringBoot实现本地存储文件上传及提供HTTP访问服务的方法
这篇文章主要介绍了SpringBoot实现本地存储文件上传及提供HTTP访问服务,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2020-08-08Spring Data JPA实现分页Pageable的实例代码
本篇文章主要介绍了Spring Data JPA实现分页Pageable的实例代码,具有一定的参考价值,有兴趣的可以了解一下2017-07-07
最新评论