Jednoduché asynchronní zpracování událostí ve Springu

Spring framework má "od přírody" k dispozici implementaci Observer patternu. To není nic jiného než mechanismus "listenerů" tak, jak jej známe například ze Swingu. Základní a defaultní implementace je velmi jednoduchá, kdekoliv v managovaných beanách můžete přes tzv. Publisher (což je typicky aplikační kontext, kterým je daná beana vytvořena) vyslat informaci o události. Tuto událost pak může zpracovat jakákoliv třída implementující ApplicationListener rozhraní, a která je správně zaregistrovaná do fronty listenerů. Registrace se provádí velmi jednoduše - pouze deklarací beany v context.xml. AbstractApplicationContext (předchůdce všech specifických implementací aplikačního kontextu) při své inicializaci všechny beany implementující zmíněné rozhraní zaregistruje.

Výchozí implementace distribuce / zpracování eventů je velmi jednoduchá a také postačuje ve valné většině případů. Jedná se o synchronní zpracování vyslaných událostí. Tzn. okamžitě, jakmile přes Publisher zveřejníte událost, jsou notifikování všichni zaregistrovaní listeneři, kteří událost opět okamžitě ve stejném Threadu zpracují - a operace pokračuje dál, až všichni se svou činností skončí. Někdy se ovšem hodí, když se mohou vybrané události zpracovat asynchronně - nezávisle na threadu, který událost vyvolal.

Situace, kdy bychom mohli chtít oddělit zpracování událostí do jiného threadu, jsou mají společné tyto základní vlastnosti:

  • nejsou blokující pro poskytnutí odezvy uživateli (tzn. operace je pouze vedlejším produktem operace, kterou uživatel vykonal)
  • náklady na její zpracování jsou větší než nepatrné - tzn. zpracováním události v jiném vlákně urychlíme odezvu uživateli
  • o případné selhání této operace nemusí být uživatel nutně informován

Otázka zní, jak tohoto docílit ...

K dispozici jsou v podstatě dvě základní řešení:

Použití JMS - Java Messaging Service

Od verze 2.0 Spring Frameworku je možné pro asynchronní zpracování událostí použít JMS ze stacku J2EE. O této variantě se můžete více dočíst například v článku na serveru OnJava. JMS je pouze specifikace API, takže implementaci si posléze můžete zvolit. JMS jako takové je možné v řadě případů provozovat samostatně bez aplikačního serveru.

Základními výhodami této varianty je především:

  • robustnost,
  • spolehlivost (je garantováno doručení zpráv) - typicky jsou zprávy persistovány do databáze,
  • distribuovatelnost = škálovatelnost

Mezi nevýhody naopak patří:

  • složitější deployment,
  • zanesení další technologie (produktu) do projektu,
  • složitější správa.

Rekonfigurace Multicaster objektu na použití jiného TaskExecutoru

Pokud nemáme takové nároky na spolehlivost a škálovatelnost mechanismu pro asynchronní zpracování událostí, můžeme se vydat radikálně jednodušší cestou pouhé rekonfigurace Springu s doplněním asi tří chybějících tříd.

Spring pracuje při správě událostí s následujícími rozhraní a implementacemi:

  • ApplicationEventPublisher - obsahuje metodu publishEvent(ApplicationEvent event), přes kterou vaše třídy mohou "vysílat" své události; toto rozhraní typicky implementuje instance aplikačního kontextu, ke kterému se dostanete např. přes callback rozhraní ApplicationContextAware
  • ApplicationEventMulticaster - obsahuje metody jako je např. addListener / removeListener a také metodu multicastEvent(ApplicationEvent event); multicaster je používán Publisherem pro rozesílání událostí zaregistrovaným listenerům a také spravuje onu množinu registrovaných listenerů
  • SimpleApplicationEventMulticaster - toto je výchozí implementace multicasteru používaná Springem - obsahuje instanci TaskExecutoru, který zajišťuje zpracování jednotlivých listenerů
  • TaskExecutor - TaskExecutor se obecněji stará o běh Runnable instancí - zde jej zmiňuji proto, že každé zpracování události listenerem je obaleno do Runnable objektu
  • SyncTaskExecutor - výchozí implementace TaskExecutoru v SimpleApplicationEventMulticasteru; vykonává jednotlivé listenery ve stejném threadu jako běží aplikační objekt, který událost vyvolal (tzn. váš aplikační objekt čeká než SyncTaskExecutor vyřídí všechny listenery)

Pro to abychom tedy změnili výchozí zpracování událostí na asynchronní nám pouze stačí vyměnit implementaci TaskExecutoru například na TimerTaskExecutor, který provede běh listenerů asynchronně v novém threadu (nicméně instanciuje pouze jeden thread, ve kterém potom zpracování daných listenerů už provádí synchronně).

K tomu nám postačuje znát, že AbstractApplicationContext při své inicializaci nejdříve hledá v context.xml deklaraci beany s názvem "applicationEventMulticaster", kterou by dosadil na místo svého "multicastera" a teprve když ji nenajde vytvoří si vlastní defaultní instanci třídy SimpleApplicationEventMulticaster. Potom můžeme jednoduše v konfiguraci uvést:


<bean id="applicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster">
<property name="taskExecutor">
		<bean class="org.springframework.scheduling.timer.TimerTaskExecutor"/>
	</property>
</bean>

Tím ovšem způsobíme, že se v asynchronním vlákně vykonají VŠECHNY ApplicationListenery, které v deklaraci daného aplikačního kontextu uvedeme. Negativním nechtěným efektem je to, že pokud při zpracování události dojde k chybě = výjimce, nedoví se o tom aplikační objekt, který událost vyvolal a tím pádem např. neselže operace, kterou vyvolal uživatel. To je stav, o který typicky nestojíme - ve většině případů dokonce potřebujeme, aby při selhání operace v listeneru došlo k pozastavení zpracování prováděné operace a byl informován uživatel.

I toho ale lze poměrně jednoduše docílit. Nahradíme celý objekt multicasteru, který se stará o správu registrovaných listenerů a broadcasting událostí. Vytvoříme třídu, která při registraci listenery rozhodí do různých "podřízených" multicasterů podle určitého klíče. Tzn. vytvoří defakto dvouúrovňový strom multicasterů, kde každý zaregistrovaný listener bude zařazen do jednoho z multicasterů druhé úrovně. V druhé úrovni budeme potom mít dva multicastery - jeden se synchronním TaskExecutorem a druhý s asynchronním.


/**
 * DistributingEventMulticaster wraps different multicaster "contexts" allowing listener registration
 * only to those, that match filter criterias. Criteria specification and resolution is dedicated to IMulticasterFilterResolver.
 */
public class DistributingEventMulticaster implements ApplicationEventMulticaster {
	private static Log log = LogFactory.getLog(DistributingEventMulticaster.class);
	private IMulticasterFilterResolver multicasterFilterResolver;
	public IMulticasterFilterResolver getMulticasterFilterResolver() {
		return multicasterFilterResolver;
	}
	public void setMulticasterFilterResolver(IMulticasterFilterResolver multicasterFilterResolver) {
		this.multicasterFilterResolver = multicasterFilterResolver;
	}
	/**
	 * Add a listener to be notified of all events.
	 *
	 * @param listener the listener to add
	 */
	public void addApplicationListener(ApplicationListener listener) {
		ApplicationEventMulticaster multicaster =
				getMulticasterFilterResolver().getApplicableMulticastContexts(listener);
		multicaster.addApplicationListener(listener);
		if(log.isDebugEnabled()) {
			log.debug("Adding listener " + listener.getClass().getSimpleName() +
					" into context of multicaster " + multicaster.getClass().getSimpleName());
		}
	}
	/**
	 * Remove a listener from the notification list.
	 *
	 * @param listener the listener to remove
	 */
	public void removeApplicationListener(ApplicationListener listener) {
		ApplicationEventMulticaster multicaster =
				getMulticasterFilterResolver().getApplicableMulticastContexts(listener);
		multicaster.removeApplicationListener(listener);
		if(log.isDebugEnabled()) {
			log.debug("Removing listener " + listener.getClass().getSimpleName() +
					" from context of multicaster " + multicaster.getClass().getSimpleName());
		}
	}
	/**
	 * Remove all listeners registered with this multicaster.
	 * It will perform no action on event notification until more
	 * listeners are registered.
	 */
	public void removeAllListeners() {
		Collection multicastContexts =
				getMulticasterFilterResolver().getAllMulticastContexts();
		for(ApplicationEventMulticaster multicaster : multicastContexts) {
			multicaster.removeAllListeners();
			if(log.isDebugEnabled()) {
				log.debug("Removing all listeners from context of multicaster "
						+ multicaster.getClass().getSimpleName());
			}
		}
	}
	/**
	 * Multicast the given application event to appropriate listeners.
	 *
	 * @param event the event to multicast
	 */
	public void multicastEvent(ApplicationEvent event) {
		Collection multicastContexts =
				getMulticasterFilterResolver().getAllMulticastContexts();
		for(ApplicationEventMulticaster multicaster : multicastContexts) {
			multicaster.multicastEvent(event);
			if(log.isDebugEnabled()) {
				log.debug("Multicasting event " + event.getClass().getSimpleName() +
						" to context of multicaster " + multicaster.getClass().getSimpleName());
			}
		}
	}
}

Pro rozhození listenerů si nadefinujeme rozhraní IMulticasterFilterResolver, které nám zakrývá implementace jak z instance listeneru zjistit, který multicaster (synchronní / asynchronní) má být vlastně použit.


public interface IMulticasterFilterResolver {
	/**
	 * Returns collection of all available multicasters.
	 * @return
	 */
	public Collection getAllMulticastContexts();
	/**
	 * Returns multicaster where particular listener shall be registered.
	 *
	 * @param listener
	 * @return
	 */
	public ApplicationEventMulticaster getApplicableMulticastContexts(ApplicationListener listener);
}

Jako základní a jednoduchou implementaci můžeme použít rozeznávání listenerů podle jejich typu (nicméně stejně jednoduše můžeme vytvořit implementace rozeznávající typy listenerů např. na základě anotací, jejich pojmenování, zařazení ve struktuře packají atd. atd.). Podívejme se tedy do těla třídy ClassTypeMulticasterResolver.


public class ClassTypeMulticasterResolver implements IMulticasterFilterResolver {
	private List separatedMulticastContexts;
	public List getSeparatedMulticastContexts() {
		return separatedMulticastContexts;
	}
	public void setSeparatedMulticastContexts(List separatedMulticastContexts) {
		this.separatedMulticastContexts = separatedMulticastContexts;
	}
	/**
	 * Returns collection of all available multicasters.
	 *
	 * @return
	 */
	public Collection getAllMulticastContexts() {
		List result = new ArrayList();
		for(MulticasterContextConfig cfg : separatedMulticastContexts) {
			result.add(cfg.getMulticaster());
		}
		return result;
	}
	/**
	 * Returns multicaster where particular listener shall be registered.
	 *
	 * @param listener
	 * @return
	 */
	public ApplicationEventMulticaster getApplicableMulticastContexts(ApplicationListener listener) {
		ApplicationEventMulticaster result = null;
		//iterate through multicaster map to get the applicable multicaster list
		List contexts = getSeparatedMulticastContexts();
		Iterator iterator = contexts.iterator();
		for(MulticasterContextConfig cfg : contexts) {
			Set patterns = cfg.getPatternClasses();
			for(Class patternClass : patterns) {
				if(patternClass.isAssignableFrom(listener.getClass())) {
					return cfg.getMulticaster();
				}
			}
		}
		return null;
	}
	/**
	 * Holds single multicaster configuration.
	 */
	public static class MulticasterContextConfig {
		private ApplicationEventMulticaster multicaster;
		private Set patternClasses;
		public ApplicationEventMulticaster getMulticaster() {
			return multicaster;
		}
		public void setMulticaster(ApplicationEventMulticaster multicaster) {
			this.multicaster = multicaster;
		}
		public Set getPatternClasses() {
			return patternClasses;
		}
		public void setPatternClasses(Set patternClasses) {
			this.patternClasses = patternClasses;
		}
	}
}

Pokud se tedy vrátíme k našemu původnímu záměru - tedy mít některé listenery zpracovávané asynchronně a jiné synchronně - můžeme jednoduchou konfigurací výše uvedených tříd našeho záměru dosáhnout. Řekněme, že všechny listenery implementující "markable" rozhraní IAsynchronousListener (které neobsahuje žádné metody a slouží jen jako "diskriminátor") budou prováděny asynchronně a zbytek listenerů synchronně. Konfigurace by byla potom následující:


<!-- touto deklarací vnutíme aplikačnímu kontextu, aby jako multicaster použil tento námi definovaný -->
<bean id="applicationEventMulticaster" class="com.fg.trtn.business.multicaster.DistributingEventMulticaster">
        <!-- pro DistributingEventMulticaster nadefinujeme implementaci filtru,
              který vrací konkrétní multicastery druhé úrovně -->
<property name="multicasterFilterResolver">
		<!-- zvolíme filter rozeznávající listenery na základě typovosti -->
		<bean class="com.fg.trtn.business.multicaster.ClassTypeMulticasterResolver">
			<!-- zde nakonfigurujeme seznam dvou multicasterů - asynchronního a synchronního -->
			<!-- na pořadí záleží, lister bude zařazen k prvnímu multicasteru, kterému bude odpovídat jeho typ -->
<property name="separatedMulticastContexts">
<list>
					<!-- priority 1: asynchronní multicaster pro třídy implementující IAsyncListener -->
					<bean class="com.fg.trtn.business.multicaster.ClassTypeMulticasterResolver$MulticasterContextConfig">
<property name="multicaster">
							<bean id="asyncEventMulticaster"
								  class="org.springframework.context.event.SimpleApplicationEventMulticaster">
<property name="taskExecutor">
									<bean class="org.springframework.scheduling.timer.TimerTaskExecutor"/>
								</property>
							</bean>
						</property>
						<!-- seznam tříd, vůči kterým má být listener porovnáván při filtrování -->
<property name="patternClasses">
							<set>
								<value>com.fg.trtn.business.listener.IAsyncListener</value>
							</set>
						</property>
					</bean>
					<!-- priority 2: pro všechny ostatní listenery použij synchronní multicaster -->
					<bean class="com.fg.trtn.business.multicaster.ClassTypeMulticasterResolver$MulticasterContextConfig">
<property name="multicaster">
							<bean id="syncEventMulticaster"
								  class="org.springframework.context.event.SimpleApplicationEventMulticaster"/>
						</property>
						<!-- všechny listenery musí implementovat rozhraní ApplicationListener, -->
						<!-- takže zde skončí zbytek našich listenerů -->
<property name="patternClasses">
							<set>
								<value>org.springframework.context.ApplicationListener</value>
							</set>
						</property>
					</bean>
				</list>
			</property>
		</bean>
	</property>
</bean>

Tímto jsme poměrně jednoduše vyřešili náš problém s prováděním asynchronních událostí v aplikaci. V článku možná vypadá řešení možná složitěji, než ve skutečnosti (když si prohlédnete dané třídy) je. Výhodou řešení je, že nám stačí pouze Spring framework a vše zůstává průhledné a jednoduché. Nevýhodou naopak je, že pokud by zpracování listenerů bylo výpočetně náročnější, mohla by se nám plnit asynchronní fronta událostí, o které bychom při pádu / restartu serveru mohli přijít. Proto bych mezi asynchronní listenery nezařazoval žádné vitálně důležité funkce / operace.

Přestože by se mohlo na první pohled zdát, že pokud požadujete pro své operace zaručené zpracování / škálovatelnost, musíte jít cestou JMS, není to 100% pravda. Podobné vlastnosti vám může dodat i v případě druhého způsobu řešení (které je "pouze v paměti") např. nasazení clustrovaného řešení pro Spring od Terracota. V takovém případě by fronta s událostmi ke zpracování byla duplikovaná na více serverech, takže:

  • při selhání jednoho serveru, by se zpracování událostí postaral server druhý
  • zpracování událostí by se rozdělilo mezi více serverů a tudíž by řešení začalo "škálovat" :)