Serialization Issues with Quartz and PostgreSQL – Invalid Header Stream!


I have been working on a prototype using the Quartz Scheduler for my current work assignment. It has been an interesting week and I have had quite a few learning experiences that will make for more than a couple of blog posts. This is especially important to share since there seems to be a paucity of useful information for troubleshooting Quartz issues online. I had a tough time figuring out stuff for myself what with the schedule deadlines always looming over my head! Nevertheless it made for a wholesome learning endeavor.

For this first blog on using the Quartz Scheduler and how to solve some live issues that will be encountered, I want to concentrate on a tricky error that I encountered while trying to implement a Persistent JobStore implementation of the Quartz Scheduler for my prototype. Here are the details of environment being used for the prototype

Quartz Scheduler 1.8.3, JDK 1.6.0_23, PostgreSQL 9.0.1

Now I know that Quartz Scheduler 2.1 is the latest stable version but due to project constraints, I had to use version 1.8.3 for my prototype. Ironically, I first learnt the Quartz Scheduling package using version 2.1! However, the issue that I was faced with is also present in basically any version of Quartz Scheduler. Alright, enough of foreplay, let’s get down to the dirty bit!
For some background,in brief, Quartz Scheduler allows jobs to be stored either in memory (RAMJobStore) or in persistent storage (JobStoreTX, JobStoreCMT). The problem with RAMJobStore is that once the JVM is shut down, all the jobs and triggers that were present in the RAMJobStore instance are lost. This is obviously not ideal for a real production quality implementation of scheduling. Of the other two types of storage, JobStoreTX is basically used to persist jobs and associated triggers (plus a bunch of other stuff that the Quartz Scheduler requires for proper functioning) in database tables (12 tables for version 1.8.3, 12 for the 2.1 version). JobstoreCMT does the same. The main difference is that JobStoreTX is ideal for stand-alone applications which handle their own database transactions whereas JobStoreCMT (as the CMT in the name suggests) is meant for applications deployed in Application Containers (such as JBoss, WebLogic et al) so that database transactions are handled by the Container itself. For my project, however, I was constrained to use the Apache Tomcat Server (6.0.24) so I had to make do with JobStoreTX. For more information, refer to the official documentation on Quartz site.
So onto the problem which a lot of people might face but be flummoxed by, since the error message is not exactly intuitive. My prototype code is shown below. First off, the code that loads the scheduler and schedules the Job:

package com.z0ltan.scheduling.prototype.jobstoretx;

import java.text.ParseException;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;

import org.apache.log4j.Logger;
import org.quartz.CronExpression;
import org.quartz.CronTrigger;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerMetaData;
import org.quartz.impl.StdSchedulerFactory;

import com.z0ltan.scheduling.prototype.jobstoretx.job.StatefulLogTimeJob;

public class JobStoreTXTestScheduler {
	private static final Logger logger = Logger
			.getLogger(JobStoreTXTestScheduler.class);

	public static void main(String[] args) {
		JobStoreTXTestScheduler jsts = new JobStoreTXTestScheduler();
		Scheduler scheduler = null;

		try {
			scheduler = jsts.getScheduler();

			logger.info("Starting scheduler");
			scheduler.start();
			logger.info("Scheduler started at " + new Date());

			logger.info("Scheduling jobs");
			jsts.scheduleJob(scheduler, "StatefulLogTimeJob",
					"StatefulLogTimeJobGroup", StatefulLogTimeJob.class, 5);

			try {
				Thread.sleep(24 * 60 * 1000);
			} catch (InterruptedException ex) {

			}

		} catch (SchedulerException ex) {
			logger.error(ex.getLocalizedMessage());
		} finally {
			try {
				if (scheduler != null && !scheduler.isShutdown()) {
					scheduler.shutdown(true); // Wait for all running jobs
												// to be
												// completed
				}
			} catch (SchedulerException exc) {
				logger.error(exc.getLocalizedMessage());
			}
		}

	}

	private Scheduler getScheduler() throws SchedulerException {
		Scheduler scheduler = null;
		StdSchedulerFactory factory = new StdSchedulerFactory();
		factory.initialize("com/z0ltan/prototype/properties/jobstoretxscheduler.properties");

		scheduler = factory.getScheduler();

		// Log Scheduler Metadata
		SchedulerMetaData metadata = scheduler.getMetaData();
		logger.info("\tScheduler Name = " + metadata.getSchedulerName());
		logger.info("\tScheduler ID = " + metadata.getSchedulerInstanceId());
		logger.info("\tThreadPool class = " + metadata.getThreadPoolClass());
		logger.info("\tThreadPool size = " + metadata.getThreadPoolSize());
		logger.info("\tJobStore class = " + metadata.getJobStoreClass());

		return scheduler;
	}

	private void scheduleJob(Scheduler scheduler, String jobName,
			String jobGroup, Class jobClass, int noOfOccurrences) {
		JobDetail job = new JobDetail(jobName, jobGroup, jobClass);
		job.getJobDataMap().put("ITER_COUNT", "1");
		job.getJobDataMap().put("NO_OF_OCCURRENCES",
				Integer.toString(noOfOccurrences));
		job.setDurability(true);

		int dayOfMonth = 30;
		GregorianCalendar cal = new GregorianCalendar();
		int year = cal.get(Calendar.YEAR);
		CronTrigger trigger1 = null;

		try {
			String cronExpressionString = "0 30 15 " + dayOfMonth + " * ?";
			CronExpression cron = new CronExpression(cronExpressionString);

			CronTrigger trigger = new CronTrigger();
			trigger.setName(jobName + "-Trigger");
			trigger.setGroup(jobGroup + "-TriggerGroup");
			trigger.setCronExpression(cron);
			trigger.setStartTime(new Date());
			trigger.setVolatility(false);
			scheduler.scheduleJob(job, trigger);
			logger.info("Job " + jobName + " scheduled successfully");
			logger.info(trigger.getNextFireTime().toString());

			// Register additional triggers for special cases
			// In the case of monthly recurrence pattern, if the day of month
			// selected is 29, 30 or 31 special care has to be taken
			switch (dayOfMonth) {
			case 29:
				if (!cal.isLeapYear(year)) {
					trigger1 = new CronTrigger();
					trigger1.setName(jobName + "-Trigger1");
					trigger1.setGroup(jobGroup + "-TriggerGroup");
					CronExpression cron1 = new CronExpression("0 30 15 L FEB ?");
					trigger1.setCronExpression(cron1);
					trigger1.setStartTime(new Date());
					trigger1.setJobName(jobName);
					trigger1.setJobGroup(jobGroup);
					trigger1.setVolatility(false);
					scheduler.scheduleJob(trigger1);
				}
				break;

			case 30:
				trigger1 = new CronTrigger();
				trigger1.setName(jobName + "-Trigger1");
				trigger1.setGroup(jobGroup + "-TriggerGroup");
				trigger1.setCronExpression("0 30 15 L FEB ?");
				trigger1.setStartTime(new Date());
				trigger1.setJobName(jobName);
				trigger1.setJobGroup(jobGroup);
				trigger1.setVolatility(false);
				scheduler.scheduleJob(trigger1);
				break;

			case 31:
				trigger1 = new CronTrigger();
				trigger1.setName(jobName + "-Trigger1");
				trigger1.setGroup(jobGroup + "-TriggerGroup");
				trigger1.setCronExpression("0 30 15 L FEB,SEP,APR,JUN,NOV ?");
				trigger1.setStartTime(new Date());
				trigger1.setJobName(jobName);
				trigger1.setJobGroup(jobGroup);
				scheduler.scheduleJob(trigger1);// This is how we add additional
												// triggers to an existing job
			}
		} catch (ParseException ex) {
			logger.error("Failed to schedule Job " + jobName
					+ " because of errors in the Cron Expression");
			return;
		} catch (SchedulerException exc) {
			logger.error("Failed to schedule Job " + jobName);
		}
	}
}

And the corresponding Quartz properties file to customize the scheduler:

#Scheduler
org.quartz.scheduler.instanceName = JOBSTORETX_SCHEDULER
org.quartz.scheduler.instanceID = AUTO

#ThreadPool
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 24
org.quartz.threadPool.threadPriority = 5

#JobStore
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.useProperties = false #PostgreSQL 9.0
org.quartz.jobStore.dataSource = myDS
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.misfireThreshold = 120000
org.quartz.jobStore.maxMisfiresToHandleAtATime = 20
org.quartz.jobStore.dontSetAutoCommitFalse = false
org.quartz.jobStore.selectWithLockSQL = SELECT * from {0}LOCKS where LOCK_NAME=? FOR UPDATE
org.quartz.jobStore.txIsolationLevelSerializable = true

#DataSource
org.quartz.dataSource.myDS.driver = org.postgresql.Driver
org.quartz.dataSource.myDS.URL = jdbc:postgresql://localhost:2377/mydatasource
org.quartz.dataSource.myDS.user =evapa_system
org.quartz.dataSource.myDS.password =password
org.quartz.dataSource.myDS.maxConnections = 25

As can be seen in the properties file above, I have used the JobStoreTX class for persistence and pointed it to a DataSource with the URL jdbc:postgresql://localhost:2377/mydatasource. All seems about right at this stage. The code for the job that I am using is shown below for completion:

package com.z0ltan.scheduling.prototype.test.job.stateful;

import java.util.Date;
import java.util.logging.Logger;

import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.StatefulJob;
import org.quartz.Trigger;

public class StatefulLogTimeJob implements StatefulJob {
	private static final Logger logger = Logger.getLogger("StatefulLogTimeJob");

	public void execute(JobExecutionContext context)
			throws JobExecutionException {
		JobDetail jobDetail = context.getJobDetail();
		Trigger trigger = context.getTrigger();

		String jobName = jobDetail.getName();
		String triggerName = trigger.getName();
		String triggerGroup = trigger.getGroup();
		int iterCount = jobDetail.getJobDataMap().getInt("ITER_COUNT");
		int noOfOccurrences = jobDetail.getJobDataMap().getInt(
				"NO_OF_OCCURRENCES");

		if (iterCount >= (noOfOccurrences + 1)) { // Since the first iteration
													// starts from 1 and not 0
			logger.info("Job "
					+ jobName
					+ " has been stopped since it has reached the maximum no. of executions.");
			logger.info("Removing trigger for " + jobName
					+ " from the Scheduler");

			Scheduler scheduler = context.getScheduler();

			if (scheduler != null) {
				try {
					scheduler.unscheduleJob(triggerName, triggerGroup);
				} catch (SchedulerException ex) {
					throw new JobExecutionException(ex, false); // do not
																// restart job
				}
			}
			return;
		}

		logger.info("Starting execution of Job " + jobName + " Iteration "
				+ iterCount);
		logger.info("\tCurrent Time = " + new Date());
		logger.info("Finished execution of Job " + jobName + " Iteration "
				+ iterCount);
		jobDetail.getJobDataMap().put("ITER_COUNT", (iterCount + 1));
	}
}

I start the scheduler and it hums along nicely. The job and the trigger are getting stored in the tables and I can see them in pgAdminIII. Then when I try to add the second trigger to the already scheduled job, as done in the line of code shown below,

scheduler.scheduleJob(trigger1);

The whole scheduler comes crashing with the following exception message:

org.quartz.JobPersistenceException: Couldn’t store trigger ‘StatefulLogTimeJob-Trigger1’ for ‘StatefulLogTimeJob’ job:invalid stream header: BB656430 [See nested exception: java.io.StreamCorruptedException: invalid stream header: BB656430]

What I did then is attach the source code for the Quartz JAR file (quartz-1.8.3.jar) and start debugging the whole code flow. As could be seen in the properties file, the scheduler is configured to use the PostgreSQLDelegate class that comes bundled with Quartz as the delegate class to handle database transactions peculiar to PostgreSQL. As I was debugging, I found out the issue occurs when the scheduler tries to add the additional trigger because the scheduler is unable to retrieve the job details from the database! It was a simple Serialization issue. Still, I was stuck with no resolution in hand. I tried the StdJDBCDelegate class but no luck there either. At last I realized that perhaps the issue was with the PostgreSQL server itself. And boy was I right! A little intuitive poking around the PostgreSQL documentation for the bytea datatype (the datatype used for binary strings or blobs in common parlance) revealed a gory picture – the documentation blatantly claimed that PostgreSQL 9.0 had changed the default behavior of the bytea datatype to hex format, which earlier versions of PostgreSQL do not support (they use the ‘escape format’ instead). This was still alright but the clincher is that they allow hex or escape formats while inserting into the database but return only hex by default while retrieving from the same table! Whoa, Nelly. So I jumped up and checked the project settings in Eclipse:

Et voila! There was the issue – the project was configured to use an older version of PostgreSQL JDBC driver (postgresql-8.4-701.jdbc4) whereas the correct version would be postgresql-9.0-802.jdbc4 (or above as I found out by experimentation). So how was the rest of the project code working fine till now? Simple – none of the tables actually persisted Serializable objects. However, for Quartz, since a job is represented by the JobDetail class which itself has a Serializable component – JobDataMap the Quartz job retrieval code would always fail since it was trying to deserialize the binary stream retrieved from the qrtz_job_details table into a JobDataMap object. This exquisitely explains the weird error message shown before. Whew! So the fix was ultimately as simple as updating the PostgreSQL JDBC database driver to the correct version!
In conclusion, it is imperative to ensure that any updates on existing projects (in this case, the project had been updated to use PostgreSQL 9.0.1 from PostgreSQL 8.4) should also have updates for all ancillary and related libraries, environment properties and other software dependencies. Also, PostgreSQL sucks. Heh.

17 thoughts on “Serialization Issues with Quartz and PostgreSQL – Invalid Header Stream!

  1. Exactly the same problem with postgresql 9.3 : “invalid stream header : BB656430”.
    I forgot to install postgresql JDBC 9.3 under eclipse/librairies/java build path.
    (I had still jar file : postgresql JDBC 8.2).

    Like

  2. I had an odd problem whereby I wasn’t getting any exceptions but my JobDataMaps on both the JobDetail and the Triggers weren’t being displayed. There was data in the job_data columns in both of the relevant quartz tables in the form of “\x1231241231..” so something was being written, it just wasn’t being decoded back into useful information.

    Turns out the driver was incorrectly handling the decoding and nobody else seems to have documented this. Much thanks for taking the time to document your problem and solution. You saved me a lot of pain, thank you!

    Like

    1. I very well understand the frustration of not finding any useful information about problems like these! I had a harrowing time trying to figure out the problems, and the PostgreSQL forums were absolutely useless. When I found the root cause of the issue, I decided to post it so that at least other people suffering from similar problems could find some resource explaining why! Glad it could be of a little help to you! 🙂

      Like

    1. Which version of JDBC driver are you using? Is it 4.x?

      Also, this earlier comment by Alex might be helpful to you:

      “Exactly the same problem with postgresql 9.3 : “invalid stream header : BB656430”.
      I forgot to install postgresql JDBC 9.3 under eclipse/librairies/java build path.
      (I had still jar file : postgresql JDBC 8.2).”

      Like

  3. I am using Postgresql 9.3 and JDBC dirver version is 9.1-901-1.jdbc4. And the qrtz_job_details schema is like :
    ee=# \d qrtz_job_details
    Table “public.qrtz_job_details”
    Column | Type | Modifiers
    ——————-+————————+—————————————————–
    job_name | character varying(128) | not null
    job_group | character varying(80) | not null
    description | character varying(120) |
    job_class_name | character varying(200) | not null
    is_durable | boolean |
    is_nonconcurrent | boolean |
    is_update_data | boolean |
    sched_name | character varying(120) | not null default ‘TestScheduler’::character varying
    requests_recovery | boolean |
    job_data | bytea |
    Indexes:
    “qrtz_job_details_pkey” PRIMARY KEY, btree (sched_name, job_name, job_group)
    “idx_qrtz_j_grp” btree (sched_name, job_group)
    Referenced by:
    TABLE “qrtz_triggers” CONSTRAINT “qrtz_triggers_sched_name_fkey” FOREIGN KEY (sched_name, job_name, job_group) REFERENCES qrtz_job_details(sched_name, job_name, job_group)

    Job_data stored in this table is like : \xaced0005737200156f72672e71

    Still, I get the following error :
    org.quartz.JobPersistenceException: Couldn’t store trigger ‘StatefulLogTimeJob-Trigger1’ for ‘StatefulLogTimeJob’ job:invalid stream header: BB656430 [See nested exception: java.io.StreamCorruptedException: invalid stream header: BB656430

    Which you have got. Please let me know if I need to change something as well ?

    Like

    1. Which version of JDBC driver are you using? Is it 4.x?

      Also, this earlier comment by Alex might be helpful to you:

      “Exactly the same problem with postgresql 9.3 : “invalid stream header : BB656430”.
      I forgot to install postgresql JDBC 9.3 under eclipse/librairies/java build path.
      (I had still jar file : postgresql JDBC 8.2).”

      Like

Speak your mind!