Batch Processing Aspect in Java

code_puzzleHi, today I made port of my Batch Processing Aspect in Java. It is quite a bit different aspect than I made previously in C#. In C#, I made something fully asynchronous, and in Java, I made an aspect that syncs invocations from many threads. I have in my mind uses of this aspect in web applications or web API or web service implementation with a static controller field, for example, named “batchProcessingAspect” and invokes changes in the database in sync. However, the aspect makes processing the database request and transforms them into batches. Most of the solution is a simulation of the multithreading environment that is by nature in mentioned kinds of applications where client’s requests are “attacking” the aspect. In multi-threading solution designs, I like attacking metaphors where all you have to do is make your solution threads-(attacking)-safe.

package net.codingbytodesign;
import java.util.*;
import java.util.concurrent.*;
interface Action<T> {
    public void invoke(T arg);
}
class ActionImpl<T> implements Action<T> {
    Action<T> action;
    public ActionImpl(Action<T> action) {
        this.action = action;
    }
    @Override
    public void invoke(T arg) {
        this.action.invoke(arg);
    }
}
class WaitNotify {
    private Boolean bool = true;
    private final Object lock = new Object();
    public Boolean waitDone(){
        if (bool) {
            synchronized (lock) {
                while (bool) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        return bool;
    }
    public void setDone(){
        synchronized(lock){
            bool = false;
            lock.notify();
        }
    }
}
class BatchProcessingAspectEntity {
    public WaitNotify waitNotify = new WaitNotify();
}
class BatchProcessingAspect<T extends BatchProcessingAspectEntity> {
    Action<List<T>> actionBatch;
    Consumer<T> consumer;
    public BatchProcessingAspect(
            Action<List<T>> actionBatch,
            int maxCount,
            int maxTimeout) {
        this.actionBatch = actionBatch;
        this.consumer = new Consumer<T>(
                this.actionBatch, maxCount, maxTimeout);
        Thread thread = new Thread(this.consumer);
        thread.start();
    }
    class Consumer<T extends BatchProcessingAspectEntity> extends Thread {
        Action<List<T>> actionBatch;
        int maxCount;
        int maxTimeout;
        Date current = new Date(System.currentTimeMillis() + maxTimeout);
        ConcurrentLinkedQueue<T> entities = new ConcurrentLinkedQueue<T>();
        public Consumer(
                Action<List<T>> actionBatch,
                int maxCount,
                int maxTimeout) {
            super("Consumer");
            this.actionBatch = actionBatch;
            this.maxCount = maxCount;
            this.maxTimeout = maxTimeout;
        }
        @Override
        public void run() {
            while (true) {
                while(entities.isEmpty()) {
                    try {
                        Thread.sleep(10);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                List<T> list = new ArrayList<T>();
                while (!entities.isEmpty() &&
                        list.size() < maxCount &&
                        new Date(System.currentTimeMillis()).before(current)) {
                    T data = entities.poll();
                    list.add(data);
                }
                if (list.size() > 0) {
                    actionBatch.invoke(list);
                    for (T entity : list) {
                        entity.waitNotify.setDone();
                    }
                }
                current = new Date(System.currentTimeMillis() + maxTimeout);
            }
        }
        void batchInvocation(T entity)
        {
            entities.add(entity);
            entity.waitNotify.waitDone();
        }
        Boolean isEmpty()
        {
            return entities.isEmpty();
        }
    }
    public void batchInvocation(T entity)
    {
        this.consumer.batchInvocation(entity);
    }
    public Boolean isEmpty()
    {
        return this.consumer.isEmpty();
    }
}
class Task extends Thread {
    public boolean running;
    public void run(){
        return;
    }
}
class MultiThreaded {
    public HashSet<Task> tasks = new HashSet<Task>();
    public void startTask(Task task) {
        task.start();
        tasks.add(task);
    }
    public Task getTask(int id) {
        for(Task task : tasks){
            if(task.getId() == id)
                return task;
        }
        return null;
    }
    public boolean processing() {
        boolean processing = false;
        for(Task task: tasks){
            if(task.isAlive())
                processing = true;
        }
        return processing;
    }
}
class Entity extends BatchProcessingAspectEntity {
    public int value = 0;
    public Entity(int value) {
        this.value = value;
    }
    public void print() {
        System.out.println(value);
    }
}
public class MainTester {
    static void processEntity(List<Entity> entiies) {
        System.out.println("Entities Count: " + entiies.size());
    }
    static BatchProcessingAspect<Entity> batchProcessingAspect
            = new BatchProcessingAspect<Entity>(
            new ActionImpl<List<Entity>>(
                    entities -> processEntity(entities)),
            20,
            200
    );
    public static void main(String[] args) {
        List<Entity> values = new ArrayList<Entity>();
        for(int i = 0; i < 1000; ++i)
            values.add(new Entity(i));
        int block = 2;
        int cursor = 0;
        List<Entity> objectList = new ArrayList<Entity>();
        MultiThreaded multiThreaded = new MultiThreaded();
        for(Entity v : values) {
            cursor++;
            objectList.add(v);
            if(cursor < block)
                continue;
            final List<Entity> objectListCopy = new ArrayList<Entity>();
            objectListCopy.addAll(objectList);
            Task task = new Task() {
                @Override
                public void run() {
                    for(Entity entity : objectListCopy)
                        batchProcessingAspect.batchInvocation(entity);
                }
            };
            multiThreaded.startTask(task);
            objectList.clear();
            cursor = 0;
        }
        while (multiThreaded.processing() || !batchProcessingAspect.isEmpty()) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            continue;
        }
        System.out.println("move on");
    }
}

Thanks for reading!

p ;).

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.