Discussion:
Pushing down Joins, Aggregates and filters, and data distribution questions
(too old to reply)
Muhammad Gelbana
2017-06-01 12:17:34 UTC
Permalink
Raw Message
First of all, I was very happy to at last attend the hangouts meeting, I've
been trying to do so for quite sometime.

I know I confused most of you during the meeting but that's because my
requirements aren't crystal clear at the moment and I'm still learning what
Drill can do. Hopefully I learn enough so I would be confident about the
options I have when I need to make implementation decisions.

Now to the point, and let me restate my case..

We have a proprietary datasource that can perform limits, aggregations,
filters and joins very fast. This datasource can handle SQL queries but not
all possible SQL syntax. I've been successful, so far, to pushdown joins,
filters and limits, but I'm still struggling with aggregates. I've sent an
email about aggregates to Calcite's mailing list.

The amount of data this datasource may be required to process can be
billions of records and 100s of GBs of data. So we are looking forward to
distribute this data among multiple servers to overcome storage limitations
and maximize throughput.

This distribution can be just duplicating the data to maximize throughput,
so each server will have the same set of data, *or* records may be
distributed among different servers, without duplication among these
servers because a single server may not be able to hold all the data. So
some tables may be duplicated and some tables may be distributed among
servers. Let's assume that the distribution details of each table is
available for the plugin.

Now I understand that for Drill to implement a query, it supports a
set of physical
operators <https://drill.apache.org/docs/physical-operators/>. These
operators logic\code is generated at runtime and it's distributed among a
Drill cluster to be compiled and executed.

So to scan a table distributed\duplicated among 3 servers, I may want to
configure Drill to execute *SELECT * FROM TABLE* by running the same query
with an extra filter (to read\scan a specific portion of the table if the
table is duplicated, to maximize throughput) or by running the query
without modifications but having Drill run the query multiple times, once
against each server. I assume this can be done by having the table's
*GroupScan* return 3 different *SubScan*s when the
*GroupScan.getSpecificScan(int)* is called multiple times, with different
parameters of course.

These different parameters can be controlled by the output of
*GroupScan.getMinParallelizationWidth()* and
*GroupScan.getMaxParallelizationWidth()*, correct ?

Please correct me if I'm wrong about anything.

Now assuming what I said is correct, I have a couple of questions:

1. If I have multiple *SubScan*s to be executed, will each *SubScan* be
handled by a single *Scan* operator ? So whenever I have *n* *SubScan*s,
I'll have *n* Scan operators distributed among Drill's cluster ?
2. How can I control the amount of any type of physical operators per
Drill cluster or node ? For instance, what if I want to have less
*Filter* operators or more *Scan* operators, how can I do that ?

I'm aware that my distribution goal may not be as simple as I may have made
it sound.

Pardon me for the huge email and thanks a lot for your time.

*---------------------*
*Muhammad Gelbana*
http://www.linkedin.com/in/mgelbana
rahul challapalli
2017-06-01 20:32:22 UTC
Permalink
Raw Message
I would first recommend you spend some time reading the execution flow
inside drill [1]. Try to understand specifically what major/minor fragments
are and that different major fragments can have different levels of
parallelism.

Let us take a simple query which runs on a 2 node cluster

select * from employee where salary > 100000;

Now how do we control parallelism for the above query? Unfortunately the
generic answer is not a simple one. But since I conveniently took a simple
query with a single major fragment, lets make an effort to understand this.
There are 3 variables which control the parallelism

1. No of cores available
2. planner.width.max_per_node : Maximum number of minor fragments within a
major fragment per node
3. Parallelism supported by the scan for the particular storage plugin
involved

Lets try to understand the last parameter which is of interest to storage
plugin developers. Like you hinted, the number of sub-scans determines the
parallelism of the above query in the absence of the first 2 variables. But
how many subscan's can exist? This unfortunately depends on how you can
split the data (by respecting the row boundaries) and is dependent on the
storage format. Hypothetically, lets say you have a file which is composed
of 100 parts and each part contains few records and you know that a single
record is not split across multiple parts. Now with this setup, the storage
plugin simply has to get the number of parts present in the data and
instantiate that many subscans.

So in the above simplistic setup the max parallelization that can be
achieved for the major fragment (and in effect the whole query) is
determined by the number of parts present in the data which is 100. Now if
you do not set (2), the default max parallelization limit is 70% of the
number of cores available. If (2) is set by the user, that determines the
max threads that can be used per node. So for our example, the max
parallelization that can be supported is MIN(100,
planner.width.max_per_node). So if the user has planner.width.max_per_node
set to 30, then we end up with a total of 60 threads (on 2 nodes combined)
which need to run 100 minor fragments

With this understanding lets move to the next related topic which is
"Assignment". Now we have 60 threads (across 2 nodes) and 100 minor
fragments. So how do you assign minor fragments to specific nodes? This is
determined by the affinity that a particular node has for handling a
particular subscan. This can be controlled by the storage plugin by using
the "public List<EndpointAffinity> getOperatorAffinity()" method in the
GroupScan class.

Now to your questions

1. If I have multiple *SubScan*s to be executed, will each *SubScan* be
handled by a single *Scan* operator ? So whenever I have *n* *SubScan*s,
I'll have *n* Scan operators distributed among Drill's cluster ?

I am not sure if I even understood your question correctly. Each minor
fragment gets executed in a single thread. In my example, each minor
fragment executes one subscan, followed by project, filter etc. Read [1] to
understand more about this.

2. How can I control the amount of any type of physical operators per
Drill cluster or node ? For instance, what if I want to have less
*Filter* operators or more *Scan* operators, how can I do that ?

I am not sure if we can control parallelism at the operator level within a
major fragment.

[1] https://drill.apache.org/docs/drill-query-execution/
Post by Muhammad Gelbana
First of all, I was very happy to at last attend the hangouts meeting, I've
been trying to do so for quite sometime.
I know I confused most of you during the meeting but that's because my
requirements aren't crystal clear at the moment and I'm still learning what
Drill can do. Hopefully I learn enough so I would be confident about the
options I have when I need to make implementation decisions.
Now to the point, and let me restate my case..
We have a proprietary datasource that can perform limits, aggregations,
filters and joins very fast. This datasource can handle SQL queries but not
all possible SQL syntax. I've been successful, so far, to pushdown joins,
filters and limits, but I'm still struggling with aggregates. I've sent an
email about aggregates to Calcite's mailing list.
The amount of data this datasource may be required to process can be
billions of records and 100s of GBs of data. So we are looking forward to
distribute this data among multiple servers to overcome storage limitations
and maximize throughput.
This distribution can be just duplicating the data to maximize throughput,
so each server will have the same set of data, *or* records may be
distributed among different servers, without duplication among these
servers because a single server may not be able to hold all the data. So
some tables may be duplicated and some tables may be distributed among
servers. Let's assume that the distribution details of each table is
available for the plugin.
Now I understand that for Drill to implement a query, it supports a
set of physical
operators <https://drill.apache.org/docs/physical-operators/>. These
operators logic\code is generated at runtime and it's distributed among a
Drill cluster to be compiled and executed.
So to scan a table distributed\duplicated among 3 servers, I may want to
configure Drill to execute *SELECT * FROM TABLE* by running the same query
with an extra filter (to read\scan a specific portion of the table if the
table is duplicated, to maximize throughput) or by running the query
without modifications but having Drill run the query multiple times, once
against each server. I assume this can be done by having the table's
*GroupScan* return 3 different *SubScan*s when the
*GroupScan.getSpecificScan(int)* is called multiple times, with different
parameters of course.
These different parameters can be controlled by the output of
*GroupScan.getMinParallelizationWidth()* and
*GroupScan.getMaxParallelizationWidth()*, correct ?
Please correct me if I'm wrong about anything.
1. If I have multiple *SubScan*s to be executed, will each *SubScan* be
handled by a single *Scan* operator ? So whenever I have *n* *SubScan*s,
I'll have *n* Scan operators distributed among Drill's cluster ?
2. How can I control the amount of any type of physical operators per
Drill cluster or node ? For instance, what if I want to have less
*Filter* operators or more *Scan* operators, how can I do that ?
I'm aware that my distribution goal may not be as simple as I may have made
it sound.
Pardon me for the huge email and thanks a lot for your time.
*---------------------*
*Muhammad Gelbana*
http://www.linkedin.com/in/mgelbana
Paul Rogers
2017-06-02 05:10:40 UTC
Permalink
Raw Message
Hi Muhammad,
Post by Muhammad Gelbana
1. If I have multiple *SubScan*s to be executed, will each *SubScan* be
handled by a single *Scan* operator ? So whenever I have *n* *SubScan*s,
I'll have *n* Scan operators distributed among Drill's cluster ?
As Rahul explained, subscans are assigned to fragments. Let’s say that three were assigned to the same fragment. In this case, a single scan operator handles all three. Your “Scan Batch Creator” will create a separate “Record Reader” for each subscan and hand them to the scan operator. The scan operator then opens, reads, an closes each in turn.
Post by Muhammad Gelbana
2. How can I control the amount of any type of physical operators per
Drill cluster or node ? For instance, what if I want to have less
*Filter* operators or more *Scan* operators, how can I do that ?
I’ve not seen anything that suggests that this is possible. Drill groups operators into fragments, then parallelizes the fragments. To accomplish what you want, you’d need to figure out how Drill slices the DAG into fragments and adjust the slicing to isolate the operators as you desire. Network exchanges join your custom fragments.

Parallelization is generic for all fragments as Rahul explained; I’ve seen nothing that suggests we have a way to identify different categories of fragments and apply different parallelization rules to each.

Maybe the

Loading...