/*
* Enhydra Java Application Server Project
*
* The contents of this file are subject to the Enhydra Public License
* Version 1.1 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License on
* the Enhydra web site ( http://www.enhydra.org/ ).
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific terms governing rights and limitations
* under the License.
*
* The Initial Developer of the Enhydra Application Server is Lutris
* Technologies, Inc. The Enhydra Application Server and portions created
* by Lutris Technologies, Inc. are Copyright Lutris Technologies, Inc.
* All Rights Reserved.
*
* Contributor(s):
*
* $Id: OutputStreamEventQueue.java,v 1.15 2005/06/13 09:26:06 draganr Exp $
*/
package com.lutris.util;
import java.lang.InterruptedException;
import java.io.OutputStream;
import java.io.IOException;
import java.util.Date;
import java.util.Vector;
import com.lutris.util.OutputStreamEventQueueEntry;
/**
A queue of "events". Each event is a write to the OutputStream.
This class implements both the reader and writer half of the queue. <P>
Each "event" is returned as an OutputStreamEventQueueEntry object. <P>
The "writer" interface is the set of OutputStream calls. This class
extends OutputStream, overriding all the OutputStream methods.
All data written to this object is stored up as "events" in an internal
queue. Each write is a separate event, so sometimes you will get one
event with data followed by one with just a newline. You can pass objects
of this class to anything that expects and OutputStream. Calls to the
OutputStream methods always return immediatly. <P>
You may optionally specify a maximum number of bytes to store. After
each write to the queue, if the new total number of bytes is larger than
this limit, then the oldest entries are discarded until the total is
at or below the limit. This could result in throwing out all the elements
in the queue. Only the number of bytes of data written by the
<CODE>write()</CODE> methods is counted, not the additional date
stamping and object encapsulation. If you specify a size limit of
zero, then no limit is imposed. <P>
The "reader" interface is the getEvent() method. Calls to this method
will block until there is an event to return. Readers may also call the
hasEventsPending() method to see if there are any events. This call does
not block. <P>
One possible use for this class is with the PrintTransactionFilter class.
If you pass in an instance of this class as the OutputStream, you may
then fetch the logging data in a convienent way. <P>
@see com.lutris.util.OutputStreamEventQueueEntry
@see java.io.OutputStream
@see com.lutris.servlet.filter.PrintTransactionFilter
@author Andy John
*/
public class OutputStreamEventQueue extends OutputStream {
// Is this "stream" open or closed? Don't allow writes to a closed stream.
private boolean open;
// My list of events.
private Vector queue;
// The current amount of data stored in the queue.
private int numBytes;
// The maximum number of bytes to be stores. Zero indicates no limit.
private int maxBytes;
/**
Create a new, empty, OutputStreamEventQueue with no limit on the
size of the data stored.
*/
public OutputStreamEventQueue() {
this(0);
}
/**
Create a new, empty, OutputStreamEventQueue with a limit on the
number of bytes it will store.
*/
public OutputStreamEventQueue(int maxBytes) {
this.maxBytes = maxBytes;
this.numBytes = 0;
open = true;
queue = new Vector();
}
//--------------------------------------------------------------------
//
// Writer functions
//
//--------------------------------------------------------------------
/**
Add an event to the queue containing one byte.
@param b The byte to save in the queue.
@exception IOException If close() has been called.
@see java.io.OutputSream
*/
public void write(int b) throws IOException {
synchronized (queue) {
if (!open)
throw new IOException("Wrote to closed OutputStreamEventQueue.");
OutputStreamEventQueueEntry evt = new OutputStreamEventQueueEntry();
evt.when = new Date();
evt.data = new byte[1];
evt.data[0] = (byte) b;
queue.addElement(evt);
this.numBytes += 4;
limitQueueSize();
queue.notify(); // Wake up one waiting reader.
}
}
/**
Add an event to the queue containing an array of bytes.
@param b The array of bytes to save in the queue.
@exception IOException If close() has been called.
@see java.io.OutputSream
*/
public void write(byte b[]) throws IOException {
synchronized (queue) {
if (!open)
throw new IOException("Wrote to closed OutputStreamEventQueue.");
OutputStreamEventQueueEntry evt = new OutputStreamEventQueueEntry(new Date(), b);
queue.addElement(evt);
this.numBytes += b.length;
limitQueueSize();
queue.notify(); // Wake up one waiting reader.
}
}
/**
Add an event to the queue containing part of an array of bytes.
Bytes b[off]..b[off+len-1] are used.
@param b The array of bytes to save part of in the queue.
@exception IOException If close() has been called.
@see java.io.OutputSream
*/
public void write(byte b[],
int off,
int len) throws IOException {
synchronized (queue) {
if (!open)
throw new IOException("Wrote to closed OutputStreamEventQueue.");
OutputStreamEventQueueEntry evt = new OutputStreamEventQueueEntry();
evt.when = new Date();
byte[] c = new byte[len];
for (int i=0; i<len; i++)
c[i] = b[i + off];
evt.data = c;
queue.addElement(evt);
this.numBytes += b.length;
limitQueueSize();
queue.notify(); // Wake up one waiting reader.
}
}
/**
Does nothing. There is no buffering; every write immediatly adds an
event to the queue. This method is provided because OutputStream
does.
@exception IOException If close() has been called.
@see java.io.OutputSream
*/
public void flush() throws IOException {
synchronized (queue) {
if (!open)
throw new IOException("Flushed a closed OutputStreamEventQueue.");
}
}
/**
Closes the OutputStream. It is no longer available for writing.
Any further writes will throw an IOException. <P>
Closing the stream wakes up all the threads blocked on a read.
If the queue is empty, they will all notice this and return null.
This is your signal that there are no events and there never will be.
@exception IOException If close() has already been called.
@see java.io.OutputSream
*/
public void close() throws IOException {
synchronized (queue) {
if (!open)
throw new IOException("Closed an already closed OutputStreamEventQueue.");
open = false;
queue.notifyAll(); // Wake up all waiting readers, so
// the all notice we are closed.
}
}
/*
Only call this if you are already synchronized on queue.
Throw out the oldest events untill we get down below our
limit on the amount of data we can hold.
This might leave the queue empty if the limit is small and
the newest entry has alot of data!
*/
private void limitQueueSize() {
if (maxBytes == 0)
return;
while (numBytes > maxBytes) {
if (queue.size() <= 0) {
numBytes = 0;
return;
}
OutputStreamEventQueueEntry evt =
(OutputStreamEventQueueEntry) queue.firstElement();
queue.removeElement(evt);
numBytes -= evt.data.length;
if (numBytes < 0 )
numBytes = 0;
}
}
//--------------------------------------------------------------------
//
// Reader functions
//
//--------------------------------------------------------------------
/**
Get an event from the queue. This will block until there is an
event to return. The data about the event is stored in a
OutputStreamEventQueueEntry object. <P>
If null is returned, that means there are no more events, and the
stream is closed, so no further events will be generated.
@return A OutputStreamEventQueueEntry object describing the event,
or null if the queue is empty and no more events are possible.
@see com.lutris.util.OutputStreamEventQueueEntry
*/
public OutputStreamEventQueueEntry getEvent() {
while (true) {
// Try to return an entry.
synchronized (queue) {
// Are there currently any waiting entries?
if (queue.size() > 0) {
// Pop off the queue.
OutputStreamEventQueueEntry evt =
(OutputStreamEventQueueEntry) queue.firstElement();
queue.removeElement(evt);
numBytes -= evt.data.length;
if (numBytes < 0)
numBytes = 0;
return evt;
} else {
// Is it possible that more entries could show up?
if (!open)
return null;
}
// Since we didn't return something, wait.
try {
queue.wait();
} catch (InterruptedException e) {
}
}
}
}
/**
Is there an event waiting in the queue right now? This call will
not block, however it is possible that another thread will take the
event before you do, so your call to getEvent may still block.
@return True is there is an event waiting in the queue.
*/
public boolean hasEventsPending() {
synchronized (queue) {
return (queue.size() > 0);
}
}
}