This was one of my sessions at the last JavaOne. This post is going to expand the subject and look into a real application using the Batch JSR-352 API. This application integrates with the MMORPG World of Warcraft.

Since the JSR-352 is a new specification in the Java EE world, I think that perhaps a lot of people don’t know how to use it properly. It may also be a challenge to identify the use cases to which this specification applies. Hopefully this example can help you understand better the use cases.

Abstract

World of Warcraft is a game played by more than 8 million players worldwide. The service is offered by region: United States (US), Europe (EU), China and Korea. Each region has a set of servers called Realm that you use to connect to be able to play the game. For this example, we are only looking into the US and EU regions.

One of the most interesting features about the game is that allows you to buy and sell in-game goods called Items, using an Auction House. Each Realm has two Auction Houses. On average each Realm trades around 70.000 Items. Let’s crunch some numbers:

  • 512 Realms (US and EU)
  • 70 K Items per Realm
  • More than 35 M Items overall

The Data

Another cool thing about World of Warcraft is that the developers provide a REST API to access most of the in-game information, including  Auction House’s data. Check here for the complete API.

An Auction House’s data is obtained in two steps. First we need to query the correspondent Auction House Realm REST endpoint to get a reference to a JSON file. Next we need to access this URL and download the file with all the Auction HouseItems information. Here’s an example.

The Application

Our objective here is to build an application that downloads the Auction Houses, process it and extract metrics. These metrics are going to build a history of the Items price evolution through time. Who knows? Maybe with this information we can predict price fluctuation and buy or sell Items at the best times.

The Setup

For the setup, we’re going to use a few additional things to Java EE 7

Jobs

The main work is going to be performed by Batch JSR-352 Jobs. A Job is an entity that encapsulates an entire batch process. A Job will be wired together via a Job Specification Language. With JSR-352, a Job is simply a container for the steps. It combines multiple steps that belong logically together in a flow.

We’re going to split the business login into three jobs:

  • Prepare – Creates all the supporting data needed. List Realms, create folders to copy files.
  • Files – Query realms to check for new files to process.
  • Process – Downloads the file, process the data, extract metrics.

The Code

Back-end – Java EE 7 with Java 8

Most of the code is going to be in the back-end. We need Batch JSR-352, but we are also going to use a lot of other technologies from Java EE: like JPA, JAX-RS, CDI and JSON-P.

Since the Prepare Job is only to initialize application resources for the processing, I’m skipping it and diving into the most interesting parts.

Files Job

The Files Job is an implementation of AbstractBatchlet. A Batchlet is the simplest processing style available in the Batch specification. It’s a task oriented step where the task is invoked once, executes, and returns an exit status. This type is most useful for performing a variety of tasks that are not item-oriented, such as executing a command or doing a file transfer. In this case, our Batchlet is going to iterate on every Realm make a REST request to each one and retrieve an URL with the file containing the data that we want to process. Here is the code:

@named
public class LoadAuctionFilesBatchlet extends AbstractBatchlet {
    @Inject
    private WoWBusiness woWBusiness;

    @Inject
    @BatchProperty(name = "region")
    private String region;
    @Inject
    @BatchProperty(name = "target")
    private String target;

    @Override
    public String process() throws Exception {
        List<Realm> realmsByRegion = woWBusiness.findRealmsByRegion(Realm.Region.valueOf(region));
        realmsByRegion.parallelStream().forEach(this::getRealmAuctionFileInformation);

        return "COMPLETED";
    }

    void getRealmAuctionFileInformation(Realm realm) {
        try {
            Client client = ClientBuilder.newClient();
            Files files = client.target(target + realm.getSlug())
                                .request(MediaType.TEXT_PLAIN).async()
                                .get(Files.class)
                                .get(2, TimeUnit.SECONDS);

            files.getFiles().forEach(auctionFile -> createAuctionFile(realm, auctionFile));
        } catch (Exception e) {
            getLogger(this.getClass().getName()).log(Level.INFO, "Could not get files for " + realm.getRealmDetail());
        }
    }

    void createAuctionFile(Realm realm, AuctionFile auctionFile) {
        auctionFile.setRealm(realm);
        auctionFile.setFileName("auctions." + auctionFile.getLastModified() + ".json");
        auctionFile.setFileStatus(FileStatus.LOADED);

        if (!woWBusiness.checkIfAuctionFileExists(auctionFile)) {
            woWBusiness.createAuctionFile(auctionFile);
        }
    }
}

A cool thing about this is the use of Java 8. With parallelStream() invoking multiple REST requests simultaneously is easy as pie! You can really notice the difference. If you want to try it out, just run the sample and replace parallelStream() with stream() and check it out. On my machine, using parallelStream() makes the task execute around 5 or 6 times faster.

Update
Usually, I would not use this approach. I’ve done it here because part of the logic involves invoking slow REST requests and parallelStreams, which really shine in this instance. Doing this using batch partitions is possible, but hard to implement. We also need to pool the servers for new data every time, so it’s not terrible if we skip a file or two. Keep in mind that if you don’t want to miss a single record a Chunk processing style is more suitable. Thank you to Simon Martinelli for bringing this to my attention.

Since the Realms of US and EU require different REST endpoints to invoke, these are perfect to partitioned. Partitioning means that the task is going to run into multiple threads. One thread per partition. In this case we have two partitions.

To complete the job definition we need to provide a JoB XML file. This needs to be placed in the META-INF/batch-jobs directory. Here is the files-job.xml for this job:

<job id="loadRealmAuctionFileJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <step id="loadRealmAuctionFileStep">
        <batchlet ref="loadAuctionFilesBatchlet">
            <properties>
                <property name="region" value="#{partitionPlan['region']}"/>
                <property name="target" value="#{partitionPlan['target']}"/>
            </properties>
        </batchlet>
        <partition>
            <plan partitions="2">
                <properties partition="0">
                    <property name="region" value="US"/>
                    <property name="target" value="http://us.battle.net/api/wow/auction/data/"/>
                </properties>
                <properties partition="1">
                    <property name="region" value="EU"/>
                    <property name="target" value="http://eu.battle.net/api/wow/auction/data/"/>
                </properties>
            </plan>
        </partition>
    </step>
</job>

In the files-job.xml we need to define our Batchlet in batchlet element. For the partitions just define the partition element and assign different properties to eachplan. These properties can then be used to late bind the value into theLoadAuctionFilesBatchlet with the expressions #{partitionPlan['region']} and#{partitionPlan['target']}. This is a very simple expression binding mechanism and only works for simple properties and Strings.

Process Job

Now we want to process the Realm Auction Data file. Using the information from the previous job, we can now download the file and do something with the data. The JSON file has the following structure:

{
    "realm": {
        "name": "Grim Batol",
        "slug": "grim-batol"
    },
    "alliance": {
        "auctions": [
            {
                "auc": 279573567,            // Auction Id
                "item": 22792,               // Item for sale Id
                "owner": "Miljanko",         // Seller Name
                "ownerRealm": "GrimBatol",   // Realm
                "bid": 3800000,              // Bid Value
                "buyout": 4000000,           // Buyout Value
                "quantity": 20,              // Numbers of items in the Auction
                "timeLeft": "LONG",          // Time left for the Auction
                "rand": 0,
                "seed": 1069994368
            },
            {
                "auc": 278907544,
                "item": 40195,
                "owner": "Mongobank",
                "ownerRealm": "GrimBatol",
                "bid": 38000,
                "buyout": 40000,
                "quantity": 1,
                "timeLeft": "VERY_LONG",
                "rand": 0,
                "seed": 1978036736
            }
        ]
    },
    "horde": {
        "auctions": [
            {
                "auc": 278268046,
                "item": 4306,
                "owner": "Thuglifer",
                "ownerRealm": "GrimBatol",
                "bid": 570000,
                "buyout": 600000,
                "quantity": 20,
                "timeLeft": "VERY_LONG",
                "rand": 0,
                "seed": 1757531904
            },
            {
                "auc": 278698948,
                "item": 4340,
                "owner": "Celticpala",
                "ownerRealm": "Aggra(Português)",
                "bid": 1000000,
                "buyout": 1000000,
                "quantity": 10,
                "timeLeft": "LONG",
                "rand": 0,
                "seed": 0
            }
        ]
    }
}

The file has a list of the Auctions from the Realm it was downloaded from. In each record we can check the item for sale, prices, seller and time left until the end of the auction. Auctions are algo aggregated by Auction House type: Alliance and Horde.

For the process-job we want to read the JSON file, transform the data and save it to a database. This can be achieved by Chunk Processing. A Chunk is an ETL (Extract – Transform – Load) style of processing which is suitable for handling large amounts of data. A Chunk reads the data one item at a time, and creates chunks that will be written out, within a transaction. One item is read in from an ItemReader, handed to an ItemProcessor, and aggregated. Once the number of items read equals the commit interval, the entire chunk is written out via the ItemWriter, and then the transaction is committed.

ItemReader

The real files are so big that they cannot be loaded entirely into memory or you may end up running out of it. Instead we use JSON-P API to parse the data in a streaming way.

@Named
public class AuctionDataItemReader extends AbstractAuctionFileProcess implements ItemReader {
    private JsonParser parser;
    private AuctionHouse auctionHouse;

    @Inject
    private JobContext jobContext;
    @Inject
    private WoWBusiness woWBusiness;

    @Override
    public void open(Serializable checkpoint) throws Exception {
        setParser(Json.createParser(openInputStream(getContext().getFileToProcess(FolderType.FI_TMP))));

        AuctionFile fileToProcess = getContext().getFileToProcess();
        fileToProcess.setFileStatus(FileStatus.PROCESSING);
        woWBusiness.updateAuctionFile(fileToProcess);
    }

    @Override
    public void close() throws Exception {
        AuctionFile fileToProcess = getContext().getFileToProcess();
        fileToProcess.setFileStatus(FileStatus.PROCESSED);
        woWBusiness.updateAuctionFile(fileToProcess);
    }

    @Override
    public Object readItem() throws Exception {
        while (parser.hasNext()) {
            JsonParser.Event event = parser.next();
            Auction auction = new Auction();
            switch (event) {
                case KEY_NAME:
                    updateAuctionHouseIfNeeded(auction);

                    if (readAuctionItem(auction)) {
                        return auction;
                    }
                    break;
            }
        }
        return null;
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        return null;
    }

    protected void updateAuctionHouseIfNeeded(Auction auction) {
        if (parser.getString().equalsIgnoreCase(AuctionHouse.ALLIANCE.toString())) {
            auctionHouse = AuctionHouse.ALLIANCE;
        } else if (parser.getString().equalsIgnoreCase(AuctionHouse.HORDE.toString())) {
            auctionHouse = AuctionHouse.HORDE;
        } else if (parser.getString().equalsIgnoreCase(AuctionHouse.NEUTRAL.toString())) {
            auctionHouse = AuctionHouse.NEUTRAL;
        }

        auction.setAuctionHouse(auctionHouse);
    }

    protected boolean readAuctionItem(Auction auction) {
        if (parser.getString().equalsIgnoreCase("auc")) {
            parser.next();
            auction.setAuctionId(parser.getLong());
            parser.next();
            parser.next();
            auction.setItemId(parser.getInt());
            parser.next();
            parser.next();
            parser.next();
            parser.next();
            auction.setOwnerRealm(parser.getString());
            parser.next();
            parser.next();
            auction.setBid(parser.getInt());
            parser.next();
            parser.next();
            auction.setBuyout(parser.getInt());
            parser.next();
            parser.next();
            auction.setQuantity(parser.getInt());
            return true;
        }
        return false;
    }

    public void setParser(JsonParser parser) {
        this.parser = parser;
    }
}

To open a JSON Parse stream we need Json.createParser and pass a reference of an inputstream. To read elements we just need to call the hasNext() and next() methods. This returns a JsonParser.Event that allows us to check the position of the parser in the stream. Elements are read and returned in the readItem() method from the Batch API ItemReader. When no more elements are available to read, return null to finish the processing. Note that we also implements the method open and close from ItemReader. These are used to initialize and clean up resources. They only execute once.

ItemProcessor

The ItemProcessor is optional. It’s used to transform the data that was read. In this case we need to add additional information to the Auction.

@Named
public class AuctionDataItemProcessor extends AbstractAuctionFileProcess implements ItemProcessor {
    @Override
    public Object processItem(Object item) throws Exception {
        Auction auction = (Auction) item;

        auction.setRealm(getContext().getRealm());
        auction.setAuctionFile(getContext().getFileToProcess());

        return auction;
    }
}

ItemWriter

Finally we just need to write the data down to a database:

@Named
public class AuctionDataItemWriter extends AbstractItemWriter {
    @PersistenceContext
    protected EntityManager em;

    @Override
    public void writeItems(List<Object> items) throws Exception {
        items.forEach(em::persist);
    }
}

The entire process with a file of 70 k record takes around 20 seconds on my machine. I did notice something very interesting. Before this code, I was using an injected EJB that called a method with the persist operation. This was taking 30 seconds in total, so injecting the EntityManager and performing the persist directly saved me a third of the processing time. I can only speculate that the delay is due to an increase of the stack call, with EJB interceptors in the middle. This was happening in WildFly. I will investigate this further.

To define the chunk we need to add it to a process-job.xml file:

<step id="processFile" next="moveFileToProcessed">
    <chunk item-count="100">
        <reader ref="auctionDataItemReader"/>
        <processor ref="auctionDataItemProcessor"/>
        <writer ref="auctionDataItemWriter"/>
    </chunk>
</step>

In the item-count property we define how many elements fit into each chunk of processing. This means that for every 100 the transaction is committed. This is useful to keep the transaction size low and to checkpoint the data. If we need to stop and then restart the operation we can do it without having to process every item again. We have to code that logic ourselves. This is not included in the sample, but I will do it in the future.

Running

To run a job we need to get a reference to a JobOperator. The JobOperator provides an interface to manage all aspects of job processing, including operational commands, such as start, restart, and stop, as well as job repository related commands, such as retrieval of job and step executions.

To run the previous files-job.xml Job we execute:

JobOperator jobOperator = BatchRuntime.getJobOperator();
jobOperator.start("files-job", new Properties());

Note that we use the name of job xml file without the extension into the JobOperator.

Next Steps

We still need to aggregate the data to extract metrics and display it into a web page. This post is already long, so I will describe the following steps in a future post. Anyway, the code for that part is already in the Github repo. Check the Resources section.

Resources

You can clone a full working copy from my GitHub repository and deploy it to WildFly. You can find instructions there to deploy it.

World of Warcraft Auctions

 Check also the Java EE samples project, with a lot of batch examples, fully documented.

Java EE 7 Batch Processing and World of Warcraft – Part One

About The Author
- Freelancer. Passionate Java Developer.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>