Pull to refresh

Spring: Реализация TaskExecutor c поддержкой транзакций

Reading time 7 min
Views 20K
Spring, позаботившись о разработчиках, предлагает удобный и простой фасад для взаимодействия с менеджером транзакций. Однако всегда ли стандартного механизма будет достаточно для реализации изощрённых архитектурных идей? Очевидно — нет.

В этом посте пойдёт речь о возможностях Spring —
  • взглянем на примеры стандартного управления транзакциями с помощью аннотаций,
  • поймём — когда решить задачу с помощью аннотаций не получится,
  • и, судя по заголовку статьи, дадим пример реализации транзакционного исполнения кода в новом потоке, создавемых с помощью Spring TaskExecutor.


Наиболее общеупотребимо задание транзакций с помощью аннотации @Transactional. Что бы воспользоваться этим механизмом достаточно сконфигурировать предпочитаемый TransactionManager и включить обработку аннотаций. В случае конфигурации с помощью XML файлов это выглядит примерно так:
    .....
     <bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
        <property name="dataSource" ref="dataSource" />
        <property name="entityManagerFactory" ref="entityManagerFactory" />
        <property name="jpaDialect" ref="jpaDialect"/>
    </bean>
    <tx:annotation-driven transaction-manager="transactionManager"/>
    ....


Использовать далее это так же просто как «навесить» аннотацию @Transactional на реализацию метода интерфейса или над всем классом-реализацией.
@Service
public class FooServiceImpl implements FooService {
    @Autowired
    private FooDao fooDao;

    @Transactional
    @Override
    public void update(Foo entity)  {
        fooDao.merge(entity);
    }
}

Однако, что бы эффективно использовать этот механизм нужно помнить о нескольких, невсегда очевидных тонкостях:
  • Класс должен быть объявлен как bean (c помощью аннотаций, code-based или в xml конфигурации контейнера)
  • Класс должен реализовывать интерфейс, иначе контейнеру будет сложно создать прокси-объект, с помощью которого и выполняется управление транзакцией.
  • Вызов транзакционных методов из другого метода того же класса не приведёт к созданию транзакции! (следствие из предыдущего пункта)

Подобный механизм позволяет не писать код управления транзакцией каждый раз!

Однако, могут ли быть ситуации, когда этого механизма будет недостаточно? или возможен ли контекст, в котором аннотациями обойтись не удастся? Боюсь, ответ очевиден. К примеру, мы можем захотеть выполнить часть кода в одной транзакции, а другую часть – во второй (тут скорее архитектурно верным будет разделить метод на два). У читателей, думаю, есть и свои примеры.

Более реалистичен пример, когда часть кода нужно выполнить асинхронно:
@Service
public class FooServiceImpl implements FooService {
    @Autowired
    private TaskExecutor taskExecutor;
    @Autowired
    private FooDao fooDao;

    @Transactional
    @Override
    public void update(Foo entity)  {
        fooDao.merge(entity);
        taskExecutor.run(new Runnable() {
            public void run() {
                someLongTimeOperation(entity);
            }
        });
    }
    
    @Transactional
    @Override
    public void someLongTimeOperation(Foo entity)  {
        // тут набор ресурсоёмких операций
    }
}


Что же получается: до старта метода update() создаётся транзакция, затем выполняются операции из тела, а по выходу из метода транзакция закрывается. Но в нашем случае создаётся новый поток, в котором будет исполнен код. И весьма очевидно, что на момент выхода из метода update() и сопутствующего уничтожения транзакции, выполнение кода во втором запущенном потоке может/будет продолжаться. Как итог, по завершению метода, во втором потоке получим исключение и вся транзакция «ролбэкнится».

К предудщему примеру добавим ручное создание транзакции:
@Service
public class FooServiceImpl implements FooService {
    @Autowired
    private TaskExecutor taskExecutor;
    @Autowired
    private FooDao fooDao;

    @Transactional
    @Override
    public void update(final Foo entity)  {
       fooDao.merge(entity);
       final TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
       taskExecutor.execute(new Runnable() {
            @Override
            public void run() {
                transactionTemplate.execute(new TransactionCallbackWithoutResult() {
                    @Override
                    protected void doInTransactionWithoutResult(TransactionStatus status) {
                        someLongTimeOperation(entity);
                    }
                });
            }
        });     
    }

    @Transactional
    @Override
    public void someLongTimeOperation(Foo entity)  {
        // тут набор ресурсоёмких операций
    }
}


Теперь someLongTimeOperation() исполняется асинхронно и в выделенной транзакции. Однако, хочется обощённой реализации, что бы не дублировать громоздкий код ручного управления.

Что ж… вот и она:
public interface TransactionalAsyncTaskExecutor extends AsyncTaskExecutor {
    void execute(Runnable task, Integer propagation, Integer isolationLevel);
}

public class DelegatedTransactionalAsyncTaskExecutor implements InitializingBean, TransactionalAsyncTaskExecutor {
    private PlatformTransactionManager transactionManager;
    private AsyncTaskExecutor delegate;
    private TransactionTemplate sharedTransactionTemplate;

    public DelegatedTransactionalAsyncTaskExecutor() {
    }

    public DelegatedTransactionalAsyncTaskExecutor(PlatformTransactionManager transactionManager, AsyncTaskExecutor delegate) {
        this.transactionManager = transactionManager;
        this.delegate = delegate;
    }

    @Override
    public void execute(final Runnable task, Integer propagation, Integer isolationLevel) {
        final TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
        transactionTemplate.setPropagationBehavior(propagation);
        transactionTemplate.setIsolationLevel(isolationLevel);
        delegate.execute(new Runnable() {
            @Override
            public void run() {
                transactionTemplate.execute(new TransactionCallbackWithoutResult() {
                    @Override
                    protected void doInTransactionWithoutResult(TransactionStatus status) {
                        task.run();
                    }
                });
            }
        });
    }

    @Override
    public void execute(final Runnable task) {
        execute(task, TransactionDefinition.PROPAGATION_REQUIRED, TransactionDefinition.ISOLATION_DEFAULT);
    }

    @Override
    public void execute(final Runnable task, long startTimeout) {
        final TransactionTemplate transactionTemplate = getSharedTransactionTemplate();
        delegate.execute(new Runnable() {
            @Override
            public void run() {
                transactionTemplate.execute(new TransactionCallbackWithoutResult() {
                    @Override
                    protected void doInTransactionWithoutResult(TransactionStatus status) {
                        task.run();
                    }
                });
            }
        }, startTimeout);

    }

    @Override
    public Future<?> submit(final Runnable task) {
        final TransactionTemplate transactionTemplate = getSharedTransactionTemplate();
        return delegate.submit(new Runnable() {
            @Override
            public void run() {
                transactionTemplate.execute(new TransactionCallbackWithoutResult() {
                    @Override
                    protected void doInTransactionWithoutResult(TransactionStatus status) {
                        task.run();
                    }
                });
            }
        });
    }

    @Override
    public <T> Future<T> submit(final Callable<T> task) {
        final TransactionTemplate transactionTemplate = getSharedTransactionTemplate();
        return delegate.submit(new Callable<T>() {
            @Override
            public T call() throws Exception {
                return transactionTemplate.execute(new TransactionCallback<T>() {
                    @Override
                    public T doInTransaction(TransactionStatus status) {
                        T result = null;
                        try {
                            result = task.call();
                        } catch (Exception e) {
                            e.printStackTrace();
                            status.setRollbackOnly();
                        }
                        return result;
                    }
                });
            }
        });
    }

    public PlatformTransactionManager getTransactionManager() {
        return transactionManager;
    }

    public void setTransactionManager(PlatformTransactionManager transactionManager) {
        this.transactionManager = transactionManager;
    }

    public AsyncTaskExecutor getDelegate() {
        return delegate;
    }

    public void setDelegate(AsyncTaskExecutor delegate) {
        this.delegate = delegate;
    }

    public TransactionTemplate getSharedTransactionTemplate() {
        return sharedTransactionTemplate;
    }

    public void setSharedTransactionTemplate(TransactionTemplate sharedTransactionTemplate) {
        this.sharedTransactionTemplate = sharedTransactionTemplate;
    }

    @Override
    public void afterPropertiesSet() {
        if (transactionManager == null) {
            throw new IllegalArgumentException("Property 'transactionManager' is required");
        }
        if (delegate == null) {
            delegate = new SimpleAsyncTaskExecutor();
        }
        if (sharedTransactionTemplate == null) {
            sharedTransactionTemplate = new TransactionTemplate(transactionManager);
        }
    }
}


Это реализация есть обёртка, в итоге делигирующая вызовы к любому TaskExecutor коих несколько в составе Spring. При этом каждый вызов «завёрнут» в транзакцию. Вручную управлять транзакциями в Spring можно используя TransactionTemplate, а вот EntityManager#getTransaction() выдаёт исключение.

Ну и наконец около практический пример в действии:

Конфигурируем TaskExecutor:
    <bean id="transactionalTaskExecutor" class="ru.habrahabr.support.spring.DelegatedTransactionalAsyncTaskExecutor">
        <property name="delegate">
            <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
                <property name="threadNamePrefix" value="Habrahabr example - "/>
                <property name="threadGroupName" value="Habrahabr examples Group"/>
                <property name="corePoolSize" value="10"/>
                <property name="waitForTasksToCompleteOnShutdown" value="true"/>
            </bean>
        </property>
        <property name="transactionManager" ref="transactionManager"/>
    </bean>


Пример сервиса:

@Service
public class FooServiceImpl implements FooService {
    @Autowired
    private TransactionalAsyncTaskExecutor trTaskExecutor;
    @Autowired
    private FooDao fooDao;

    @Transactional
    @Override
    public void update(Foo entity)  {
        fooDao.merge(entity);                                   // Выполнится в транзакции созданной Spring'ом (tr_1).
        trTaskExecutor.run(new Runnable() {       // Запустится новый поток и новая транзакция (tr_2), метод run() выполнится паралельно текущему потоку и в рамках транзакции tr_2.
            public void run() {
                someLongTimeOperation();
            }
        });
    }                                                                             // Выход из метода и вместе с этим tr_1 завершится. Обрабока tr_2 осуществится с помощью TransactionTemplate.
     
    @Transactional
    @Override
    public void someLongTimeOperation(Foo entity)  {
        // тут набор ресурсоёмких операций
    }
}


Таким образом, имеем вполне обобщённую реализацию обёртки для TaskExecutor, позволяющую избежать дублирования кода создания транзакций.
Tags:
Hubs:
+8
Comments 11
Comments Comments 11

Articles