Cautious Broccoli is a lightweight, TypeScript-based batch processing framework inspired by Spring Batch. It provides a robust foundation for building batch applications with features like chunk processing, retry mechanisms, and modular job execution.
- Job Management: Organize batch processes into jobs with multiple steps
- Flexible Step Types: Support for both tasklet-based and chunk-oriented processing
- Retry Mechanisms: Built-in retry policies for handling transient failures
- Chunk Processing: Efficient processing of large datasets in chunks
- Type Safety: Full TypeScript support with generics for type-safe batch processing
- Extensible Architecture: Easy to extend and customize for specific needs
A Job is the main unit of work in TSBatch. It contains one or more Steps that are executed sequentially.
const job = new Job("MyJob");
job.addStep(step1);
job.addStep(step2);
await job.execute();
TSBatch supports two types of steps:
- TaskletStep: For simple, single-operation tasks
- ChunkStep: For processing data in chunks (read-process-write pattern)
class MyTasklet implements Tasklet {
async execute(context: ExecutionContext): Promise<RepeatStatus> {
// Do something
return RepeatStatus.FINISHED;
}
}
const taskletStep = new TaskletStep(new MyTasklet());
const chunkStep = new ChunkStep<InputType, OutputType>(
reader, // implements ItemReader<InputType>
processor, // implements ItemProcessor<InputType, OutputType>
writer, // implements ItemWriter<OutputType>
chunkSize // number
);
TSBatch provides robust retry mechanisms for handling transient failures:
const retryPolicy: RetryPolicy = {
maxRetries: 3,
delayMs: 500,
shouldRetry: (error) => error.message.includes("Transient")
};
interface Step {
execute(context: ExecutionContext): Promise<void>;
}
interface Tasklet {
execute(context: ExecutionContext): Promise<RepeatStatus>;
}
interface ItemReader<T> {
read(): Promise<T | null>;
}
interface ItemProcessor<I, O> {
process(item: I): Promise<O | null>;
}
interface ItemWriter<T> {
write(items: T[]): Promise<void>;
}
interface RetryPolicy {
maxRetries: number;
delayMs?: number;
shouldRetry?: (error: any) => boolean;
}
interface ChunkRetryPolicies {
readerRetryPolicy?: RetryPolicy;
processorRetryPolicy?: RetryPolicy;
writerRetryPolicy?: RetryPolicy;
}
// Create a simple tasklet
class PrintTasklet implements Tasklet {
constructor(private message: string) {}
async execute(context: ExecutionContext): Promise<RepeatStatus> {
console.log(this.message);
return RepeatStatus.FINISHED;
}
}
// Create and run a job
const job = new Job("SimpleJob");
job.addStep(new TaskletStep(new PrintTasklet("Hello, World!")));
await job.execute();
// Define retry policies
const chunkRetryPolicies: ChunkRetryPolicies = {
readerRetryPolicy: { maxRetries: 3, delayMs: 1000 },
processorRetryPolicy: { maxRetries: 2, delayMs: 500 },
writerRetryPolicy: { maxRetries: 3, delayMs: 1000 }
};
// Create a retryable chunk step
const retryableChunkStep = new RetryableChunkStep(
new DataReader(),
new DataProcessor(),
new DataWriter(),
10, // chunk size
chunkRetryPolicies
);
// Create and run the job
const job = new Job("ChunkJob");
job.addStep(retryableChunkStep);
await job.execute();
- Error Handling: Always implement proper error handling in your readers, processors, and writers.
- Chunk Size: Choose appropriate chunk sizes based on your data and memory constraints.
- Retry Policies: Configure retry policies based on the nature of potential failures.
- Context Usage: Use ExecutionContext to share data between steps when necessary.
- Type Safety: Leverage TypeScript's type system to ensure type safety throughout your batch process.
This project is licensed under the MIT License. See the LICENSE file for details.
This project is created by @maccals.
Contributions are welcome! Please feel free to submit a Pull Request.
This project uses commitlint to enforce commit message conventions. the commit message should be in the following format:
<type>(<scope>): <subject>
with the following types:
- feat: A new feature
- fix: A bug fix
- refactor: A code change that neither fixes a bug nor adds a feature
- perf: A code change that improves performance
- test: Adding missing tests
- build: Changes that affect the build system or external dependencies
- ops: Changes to the CI configuration files and scripts
- docs: Documentation only changes
- style: Changes that do not affect the meaning of the code (white-space, formatting, etc)
- merge: Merge branch 'pr-branch' into 'main'
and the scope should be the name of the package affected by the changes.