An interesting, advanced feature of Pangool is its **rollup** capability.

There are some cases where it is interesting to perform aggregate sub-groupings within a Reducer.
Let’s pose an example.

Let’s imagine we want to process a set of Tweets and calculate the top N hashtags for each
location and date.

We could write a job that groups all tweets by (`location`

, `date`

, `hashtag`

) and counts each hashtag.

Then, we could write a second job that performs the top N selection grouping by (location, date).

However, using *rollup*, we can merge both things easily in a single job.

Let’s visualize the intermediate schema that we will have in such a job - and some data, in no particular order:

[location, date, hashtag, count] "L1" "d1" "h1" 5 "L1" "d1" "h2" 5 "L1" "d2" "h1" 1 "L1" "d2" "h1" 1 "L1" "d1" "h1" 3 "L1" "d1" "h2" 7

We want to group by (`location`

, `date`

, `hashtag`

) to be able
to aggregate all the counts per each (`location`

, `date`

, `hashtag`

) group.

Therefore tuples will also be sorted by (`location`

, `date`

, `hashtag`

).

At the end of each `reduce()`

method we’ll output the total count:

[location, date, hashtag, count] "L1" "d1" "h1" 5 "L1" "d1" "h1" 3 output("L1", "d1", "h1", 8) "L1" "d1" "h2" 5 "L1" "d1" "h2" 7 output("L1", "d1", "h2", 12) "L1" "d2" "h1" 1 "L1" "d2" "h1" 1 output("L1", "d2", "h1", 2)

However, we can leverage the fact that we are sorting by all the fields in the (`location`

, `date`

, `hashtag`

)
group to add business logic every time that some of these fields change its value - in other words, adding ** subgroups**.

This way, apart from having one reduce() method, we can also have

`onClose()`

/`onOpen()`

-like methods for other fields of the group.
So we could modify our job to save each (`location`

, `date`

, `hashtag`

)
total count in a n-sized heap in memory.

Then, in the `onClose()`

of the `date`

field, we could simply flush
the top n hashtags to the output, which would be the ones that remain in the heap.

Let’s imagine we want to get the “top 1” hashtag for each location, date. Let’s assume we rollup from date:

[location, date, hashtag, count] "L1" "d1" "h1" 5 "L1" "d1" "h1" 3 reduce("L1", "d1", "h1") : "Add to a 1-sized heap: (h1, 8)" "L1" "d1" "h2" 5 "L1" "d1" "h2" 7 reduce("L1", "d1", "h2") : "Add to a 1-sized heap: (h2, 12)" onClose("d1") : output("h2", 12) "L1" "d2" "h1" 1 "L1" "d2" "h1" 1 reduce("L1", "d2", "h1") : "Add to a 1-sized heap: (h1, 2)" onClose("d2") : output("h1", 2)

Got the idea? The idea is that, by leveraging the way we are sorting the Tuples, we can *inject*
business logic everytime a wider group opens or closes.

For this example, we would configure the job as:

grouper.setGroupByFields("location", "date", "hashtag"); grouper.setRollupFrom("date");

We would then need to use a special reducer that extends TupleRollupReducer.

This Reducer will have extra methods: `onOpen()`

and `onClose()`

where
we’ll be able to add arbitrary business logic when any of the fields configured in *rollup from* change.
In this example, we’ll be notified of fields `date`

and `hashtag`

.

We’ll still be able to use the `reduce()`

method for the `location`

, `date`

, `hashtag`

groups.

You can see this example in : TopNHashTags.java

Note that when using rollup, the default partitioning strategy is changed.

To keep things coherent, Pangool will need to partition by the fields that fall left to the `rollupFrom`

field (including it).

In the example, Pangool will partition by `location`

, `date`

.

If we rollupFrom from `location`

then Pangool would just partition by `location`

.

The wider the subgroups we create with *rollup*, the worse the partition function will be.

So, rollup has to be used with care in order not to create too-wide groups that could make reducer completion time deviate too much from one another.