Serialization Issues with Quartz and PostgreSQL – Invalid Header Stream!

I have been working on a prototype using the Quartz Scheduler for my current work assignment. It has been an interesting week and I have had quite a few learning experiences that will make for more than a couple of blog posts. This is especially important to share since there seems to be a paucity of useful information for troubleshooting Quartz issues online. I had a tough time figuring out stuff for myself what with the schedule deadlines always looming over my head! Nevertheless it made for a wholesome learning endeavor.

For this first blog on using the Quartz Scheduler and how to solve some live issues that will be encountered, I want to concentrate on a tricky error that I encountered while trying to implement a Persistent JobStore implementation of the Quartz Scheduler for my prototype. Here are the details of environment being used for the prototype

Quartz Scheduler 1.8.3, JDK 1.6.0_23, PostgreSQL 9.0.1

Now I know that Quartz Scheduler 2.1 is the latest stable version but due to project constraints, I had to use version 1.8.3 for my prototype. Ironically, I first learnt the Quartz Scheduling package using version 2.1! However, the issue that I was faced with is also present in basically any version of Quartz Scheduler. Alright, enough of foreplay, let’s get down to the dirty bit!
For some background,in brief, Quartz Scheduler allows jobs to be stored either in memory (RAMJobStore) or in persistent storage (JobStoreTX, JobStoreCMT). The problem with RAMJobStore is that once the JVM is shut down, all the jobs and triggers that were present in the RAMJobStore instance are lost. This is obviously not ideal for a real production quality implementation of scheduling. Of the other two types of storage, JobStoreTX is basically used to persist jobs and associated triggers (plus a bunch of other stuff that the Quartz Scheduler requires for proper functioning) in database tables (12 tables for version 1.8.3, 12 for the 2.1 version). JobstoreCMT does the same. The main difference is that JobStoreTX is ideal for stand-alone applications which handle their own database transactions whereas JobStoreCMT (as the CMT in the name suggests) is meant for applications deployed in Application Containers (such as JBoss, WebLogic et al) so that database transactions are handled by the Container itself. For my project, however, I was constrained to use the Apache Tomcat Server (6.0.24) so I had to make do with JobStoreTX. For more information, refer to the official documentation on Quartz site.
So onto the problem which a lot of people might face but be flummoxed by, since the error message is not exactly intuitive. My prototype code is shown below. First off, the code that loads the scheduler and schedules the Job:

package com.z0ltan.scheduling.prototype.jobstoretx;

import java.text.ParseException;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;

import org.apache.log4j.Logger;
import org.quartz.CronExpression;
import org.quartz.CronTrigger;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerMetaData;
import org.quartz.impl.StdSchedulerFactory;

import com.z0ltan.scheduling.prototype.jobstoretx.job.StatefulLogTimeJob;

public class JobStoreTXTestScheduler {
	private static final Logger logger = Logger
			.getLogger(JobStoreTXTestScheduler.class);

	public static void main(String[] args) {
		JobStoreTXTestScheduler jsts = new JobStoreTXTestScheduler();
		Scheduler scheduler = null;

		try {
			scheduler = jsts.getScheduler();

			logger.info("Starting scheduler");
			scheduler.start();
			logger.info("Scheduler started at " + new Date());

			logger.info("Scheduling jobs");
			jsts.scheduleJob(scheduler, "StatefulLogTimeJob",
					"StatefulLogTimeJobGroup", StatefulLogTimeJob.class, 5);

			try {
				Thread.sleep(24 * 60 * 1000);
			} catch (InterruptedException ex) {

			}

		} catch (SchedulerException ex) {
			logger.error(ex.getLocalizedMessage());
		} finally {
			try {
				if (scheduler != null && !scheduler.isShutdown()) {
					scheduler.shutdown(true); // Wait for all running jobs
												// to be
												// completed
				}
			} catch (SchedulerException exc) {
				logger.error(exc.getLocalizedMessage());
			}
		}

	}

	private Scheduler getScheduler() throws SchedulerException {
		Scheduler scheduler = null;
		StdSchedulerFactory factory = new StdSchedulerFactory();
		factory.initialize("com/z0ltan/prototype/properties/jobstoretxscheduler.properties");

		scheduler = factory.getScheduler();

		// Log Scheduler Metadata
		SchedulerMetaData metadata = scheduler.getMetaData();
		logger.info("\tScheduler Name = " + metadata.getSchedulerName());
		logger.info("\tScheduler ID = " + metadata.getSchedulerInstanceId());
		logger.info("\tThreadPool class = " + metadata.getThreadPoolClass());
		logger.info("\tThreadPool size = " + metadata.getThreadPoolSize());
		logger.info("\tJobStore class = " + metadata.getJobStoreClass());

		return scheduler;
	}

	private void scheduleJob(Scheduler scheduler, String jobName,
			String jobGroup, Class jobClass, int noOfOccurrences) {
		JobDetail job = new JobDetail(jobName, jobGroup, jobClass);
		job.getJobDataMap().put("ITER_COUNT", "1");
		job.getJobDataMap().put("NO_OF_OCCURRENCES",
				Integer.toString(noOfOccurrences));
		job.setDurability(true);

		int dayOfMonth = 30;
		GregorianCalendar cal = new GregorianCalendar();
		int year = cal.get(Calendar.YEAR);
		CronTrigger trigger1 = null;

		try {
			String cronExpressionString = "0 30 15 " + dayOfMonth + " * ?";
			CronExpression cron = new CronExpression(cronExpressionString);

			CronTrigger trigger = new CronTrigger();
			trigger.setName(jobName + "-Trigger");
			trigger.setGroup(jobGroup + "-TriggerGroup");
			trigger.setCronExpression(cron);
			trigger.setStartTime(new Date());
			trigger.setVolatility(false);
			scheduler.scheduleJob(job, trigger);
			logger.info("Job " + jobName + " scheduled successfully");
			logger.info(trigger.getNextFireTime().toString());

			// Register additional triggers for special cases
			// In the case of monthly recurrence pattern, if the day of month
			// selected is 29, 30 or 31 special care has to be taken
			switch (dayOfMonth) {
			case 29:
				if (!cal.isLeapYear(year)) {
					trigger1 = new CronTrigger();
					trigger1.setName(jobName + "-Trigger1");
					trigger1.setGroup(jobGroup + "-TriggerGroup");
					CronExpression cron1 = new CronExpression("0 30 15 L FEB ?");
					trigger1.setCronExpression(cron1);
					trigger1.setStartTime(new Date());
					trigger1.setJobName(jobName);
					trigger1.setJobGroup(jobGroup);
					trigger1.setVolatility(false);
					scheduler.scheduleJob(trigger1);
				}
				break;

			case 30:
				trigger1 = new CronTrigger();
				trigger1.setName(jobName + "-Trigger1");
				trigger1.setGroup(jobGroup + "-TriggerGroup");
				trigger1.setCronExpression("0 30 15 L FEB ?");
				trigger1.setStartTime(new Date());
				trigger1.setJobName(jobName);
				trigger1.setJobGroup(jobGroup);
				trigger1.setVolatility(false);
				scheduler.scheduleJob(trigger1);
				break;

			case 31:
				trigger1 = new CronTrigger();
				trigger1.setName(jobName + "-Trigger1");
				trigger1.setGroup(jobGroup + "-TriggerGroup");
				trigger1.setCronExpression("0 30 15 L FEB,SEP,APR,JUN,NOV ?");
				trigger1.setStartTime(new Date());
				trigger1.setJobName(jobName);
				trigger1.setJobGroup(jobGroup);
				scheduler.scheduleJob(trigger1);// This is how we add additional
												// triggers to an existing job
			}
		} catch (ParseException ex) {
			logger.error("Failed to schedule Job " + jobName
					+ " because of errors in the Cron Expression");
			return;
		} catch (SchedulerException exc) {
			logger.error("Failed to schedule Job " + jobName);
		}
	}
}

And the corresponding Quartz properties file to customize the scheduler:

#Scheduler
org.quartz.scheduler.instanceName = JOBSTORETX_SCHEDULER
org.quartz.scheduler.instanceID = AUTO

#ThreadPool
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 24
org.quartz.threadPool.threadPriority = 5

#JobStore
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.useProperties = false #PostgreSQL 9.0
org.quartz.jobStore.dataSource = myDS
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.misfireThreshold = 120000
org.quartz.jobStore.maxMisfiresToHandleAtATime = 20
org.quartz.jobStore.dontSetAutoCommitFalse = false
org.quartz.jobStore.selectWithLockSQL = SELECT * from {0}LOCKS where LOCK_NAME=? FOR UPDATE
org.quartz.jobStore.txIsolationLevelSerializable = true

#DataSource
org.quartz.dataSource.myDS.driver = org.postgresql.Driver
org.quartz.dataSource.myDS.URL = jdbc:postgresql://localhost:2377/mydatasource
org.quartz.dataSource.myDS.user =evapa_system
org.quartz.dataSource.myDS.password =password
org.quartz.dataSource.myDS.maxConnections = 25

As can be seen in the properties file above, I have used the JobStoreTX class for persistence and pointed it to a DataSource with the URL jdbc:postgresql://localhost:2377/mydatasource. All seems about right at this stage. The code for the job that I am using is shown below for completion:

package com.z0ltan.scheduling.prototype.test.job.stateful;

import java.util.Date;
import java.util.logging.Logger;

import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.StatefulJob;
import org.quartz.Trigger;

public class StatefulLogTimeJob implements StatefulJob {
	private static final Logger logger = Logger.getLogger("StatefulLogTimeJob");

	public void execute(JobExecutionContext context)
			throws JobExecutionException {
		JobDetail jobDetail = context.getJobDetail();
		Trigger trigger = context.getTrigger();

		String jobName = jobDetail.getName();
		String triggerName = trigger.getName();
		String triggerGroup = trigger.getGroup();
		int iterCount = jobDetail.getJobDataMap().getInt("ITER_COUNT");
		int noOfOccurrences = jobDetail.getJobDataMap().getInt(
				"NO_OF_OCCURRENCES");

		if (iterCount >= (noOfOccurrences + 1)) { // Since the first iteration
													// starts from 1 and not 0
			logger.info("Job "
					+ jobName
					+ " has been stopped since it has reached the maximum no. of executions.");
			logger.info("Removing trigger for " + jobName
					+ " from the Scheduler");

			Scheduler scheduler = context.getScheduler();

			if (scheduler != null) {
				try {
					scheduler.unscheduleJob(triggerName, triggerGroup);
				} catch (SchedulerException ex) {
					throw new JobExecutionException(ex, false); // do not
																// restart job
				}
			}
			return;
		}

		logger.info("Starting execution of Job " + jobName + " Iteration "
				+ iterCount);
		logger.info("\tCurrent Time = " + new Date());
		logger.info("Finished execution of Job " + jobName + " Iteration "
				+ iterCount);
		jobDetail.getJobDataMap().put("ITER_COUNT", (iterCount + 1));
	}
}

I start the scheduler and it hums along nicely. The job and the trigger are getting stored in the tables and I can see them in pgAdminIII. Then when I try to add the second trigger to the already scheduled job, as done in the line of code shown below,

scheduler.scheduleJob(trigger1);

The whole scheduler comes crashing with the following exception message:

org.quartz.JobPersistenceException: Couldn’t store trigger ‘StatefulLogTimeJob-Trigger1’ for ‘StatefulLogTimeJob’ job:invalid stream header: BB656430 [See nested exception: java.io.StreamCorruptedException: invalid stream header: BB656430]

What I did then is attach the source code for the Quartz JAR file (quartz-1.8.3.jar) and start debugging the whole code flow. As could be seen in the properties file, the scheduler is configured to use the PostgreSQLDelegate class that comes bundled with Quartz as the delegate class to handle database transactions peculiar to PostgreSQL. As I was debugging, I found out the issue occurs when the scheduler tries to add the additional trigger because the scheduler is unable to retrieve the job details from the database! It was a simple Serialization issue. Still, I was stuck with no resolution in hand. I tried the StdJDBCDelegate class but no luck there either. At last I realized that perhaps the issue was with the PostgreSQL server itself. And boy was I right! A little intuitive poking around the PostgreSQL documentation for the bytea datatype (the datatype used for binary strings or blobs in common parlance) revealed a gory picture – the documentation blatantly claimed that PostgreSQL 9.0 had changed the default behavior of the bytea datatype to hex format, which earlier versions of PostgreSQL do not support (they use the ‘escape format’ instead). This was still alright but the clincher is that they allow hex or escape formats while inserting into the database but return only hex by default while retrieving from the same table! Whoa, Nelly. So I jumped up and checked the project settings in Eclipse:

Et voila! There was the issue – the project was configured to use an older version of PostgreSQL JDBC driver (postgresql-8.4-701.jdbc4) whereas the correct version would be postgresql-9.0-802.jdbc4 (or above as I found out by experimentation). So how was the rest of the project code working fine till now? Simple – none of the tables actually persisted Serializable objects. However, for Quartz, since a job is represented by the JobDetail class which itself has a Serializable component – JobDataMap the Quartz job retrieval code would always fail since it was trying to deserialize the binary stream retrieved from the qrtz_job_details table into a JobDataMap object. This exquisitely explains the weird error message shown before. Whew! So the fix was ultimately as simple as updating the PostgreSQL JDBC database driver to the correct version!
In conclusion, it is imperative to ensure that any updates on existing projects (in this case, the project had been updated to use PostgreSQL 9.0.1 from PostgreSQL 8.4) should also have updates for all ancillary and related libraries, environment properties and other software dependencies. Also, PostgreSQL sucks. Heh.