LinkedIn link GitHub link

Apache Arrow Flight at first glance

In this post we will try to get a quick overview of the new Apache Arrow Flight framework functionality.

For some time there has been a new heavy weight contender for quick data access - Apache Arrow. Following in the footsteps of in-memory frameworks such as Spark, it builds on top of the trends for columnar data usage that were prevalent in Big Data world in the last decade. It has made huge waves in a domain that has been stagnating a bit and is sure to stay as one of the most commonly used tools. Although Arrow in itself is an interesting thing to know, there is another important reason to give this particular piece of technology a go - Apache Arrow Flight. This handy extension of the standard allows to use the same principles that constitute the foundation of Arrow in distributed systems, offering a lot of functionality OotB and opening a way for mass adoption.

So what does the Arrow itself actually offer?

According to the official webpage Apache Arrow “is a cross-language development platform for in-memory data”.

Records are oriented in a columnar format which preserves locality and decreases query time by a substantial amount. Buffers for storage are zero-copy and can be accessed using different language bindings. Since the data does not need to be swapped or copied it allows for more efficient CPU or GPU utilization. Memory access is the bottleneck of most modern architectures and this is certainly a good way to solve it.

What is the Flight framework then?

Basically the same thing, but extrapolated to a distributed scenario. Instead of accessing local memory, we can reference endpoints containing parts of the columnar data and optimize the way we operate on it. Arrow Flight uses the gRPC framework on its transport layer, which is in itself highly efficient, but what is really great is that it intercepts the calls to avoid doing unnecessary serialization and deserialization. You can read more about the framework in this blog post or in the documentation.

What needs to be said is that Flight is still in active development. The few bindings that exist (Python, Java, C++, etc.) often introduce breaking changes, and probably will for a while.

For this post we have chosen to use Python 3.6 with Apache Arrow Flight in version 0.15.1. Please keep in mind, that the examples might not work for other versions of the framework.

If for any reason you still haven’t tried using Conda to manage Python environments, we highly recommend to try it right now. Miniconda installer is a nice place to start.

Here is a simple example of first steps after installation:

conda create -n flight-test python=3.6
conda activate flight-test
conda install pyarrow

You can install in similar manner any other missing dependencies, test out the examples, and when you are done simply issue:

conda deactivate

Without further ado, let’s take out Apache Arrow Flight for a spin.

The first thing that we must do is to create a simple server that will respond to our requests. Let’s for now assume it will contain only some static data:

import pyarrow as pa
import pyarrow.flight as fl

def create_table_int():
    data = [
        pa.array([1, 2, 3]),
        pa.array([4, 5, 6])
    ]
    return pa.Table.from_arrays(data, names=['column1', 'column2'])


def create_table_dict():
    keys = pa.array(["x", "y", "z"], type=pa.utf8())
    data = [
        pa.chunked_array([
            pa.DictionaryArray.from_arrays([0, 1, 2], keys),
            pa.DictionaryArray.from_arrays([0, 1, 2], keys)
        ]),
        pa.chunked_array([
            pa.DictionaryArray.from_arrays([1, 1, 1], keys),
            pa.DictionaryArray.from_arrays([2, 2, 2], keys)
        ])
    ]
    return pa.Table.from_arrays(data, names=['column1', 'column2'])

class FlightServer(fl.FlightServerBase):

    def __init__(self, location="grpc://0.0.0.0:8815", **kwargs):
        super(FlightServer, self).__init__(location, **kwargs)

        self.tables = {
            b'table_int': create_table_int(),
            b'table_dict': create_table_dict(),
        }

    def do_get(self, context, ticket):
        table = self.tables[ticket.ticket]
        return fl.RecordBatchStream(table)
        # return fl.GeneratorStream(table.schema, table.to_batches(max_chunksize=1024))

def main():
    FlightServer().serve()

if __name__ == '__main__':
    main()

A lot is happening here, so let’s dissect it a little bit.

We are creating and running an instance of FlightServer that is bound to the address “grpc://0.0.0.0:8815”. It is possible to use tls and add a layer of additional authentication on top of that, but we do not cover it in this tutorial.

Two tables are accessible through the do_get(...) method, one containing ints and one with dictionary structure. Flight allows us to create and serve records with any schema that is handled by the Arrow framework. Unfortunately the documentation can be a bit lacking in that regard. We found that consulting the part related to integration testing might be helpful to get a better picture of what is actually possible.

Right now we identify the referenced tables by using a “ticket” name and return in a record batch. It is possible to create a streaming API and specify a chunk size. Since it’s all working on HTTP/2, it should prove to be very fast.

OK, let’s try consuming some of the data that we toiled to produce, by introducing a separate client script:

import argparse
import sys

import pyarrow as pa
import pyarrow.flight as fl

def get_by_ticket(args, client):
    ticket_name = args.name
    response = client.do_get(fl.Ticket(ticket_name)).read_all()
    print_response(response)

def get_by_ticket_pandas(args, client):
    ticket_name = args.name
    response = client.do_get(fl.Ticket(ticket_name)).read_pandas()
    print_response(response)

def print_response(data):
    print("=== Response ===")
    print(data)
    print("================")

def main():
    parser = argparse.ArgumentParser()
    subcommands = parser.add_subparsers()

    cmd_get_by_t = subcommands.add_parser('get_by_ticket')
    cmd_get_by_t.set_defaults(action='get_by_ticket')
    cmd_get_by_t.add_argument('-n', '--name', type=str, help="Name of the ticket to fetch.")

    cmd_get_by_tp = subcommands.add_parser('get_by_ticket_pandas')
    cmd_get_by_tp.set_defaults(action='get_by_ticket_pandas')
    cmd_get_by_tp.add_argument('-n', '--name', type=str, help="Name of the ticket to fetch.")

    args = parser.parse_args()
    if not hasattr(args, 'action'):
        parser.print_help()
        sys.exit(1)

    commands = {
        'get_by_ticket': get_by_ticket,
        'get_by_ticket_pandas': get_by_ticket_pandas,
    }

    client = fl.connect("grpc://0.0.0.0:8815")

    commands[args.action](args, client)


if __name__ == '__main__':
    main()

Once again a large portion of code.

What we are essentially doing is creating a command line interface to send requests directly to our server. In a distributed system scenario we could have separate nodes handling requests providing schema and endpoints definitions, and nodes returning actual data, but for now let’s focus on a case in which the server can respond to both types of calls.

Using the code provided above, we can get our data issuing a command like:

python ./_client.py get_by_ticket -n table_int

You might have noticed that we have a second command that does a very similar thing - get_by_ticket_pandas. The only difference is that we get data in a more “human readable” form, thanks to integration with the pandas framework.

So what actually happened in this simple example? We sent a request from the client, which allocated a buffer, the server recognized the ticket and accessed its own buffer, then created a gRPC response skipping the serialization part and using the buffer directly as the payload, finally client accessed the data also skipping serialization and pushing response to its own buffer. At least two times we have avoided copying the data, four if we count in serialization. It just has to be faster than any other alternative.

OK, now that we have a working request-response example, let’s take a step back and try to describe what we did in Flight terms, by creating a schema and endpoints definition. It will be pretty straightforward for endpoints, since we have only one server, and not that hard for schema, because we can use tables to derive it:

    def get_flight_info(self, context, descriptor):
        ticket_name = b''.join(descriptor.path)
        if ticket_name in self.tables:
            table = self.tables[ticket_name]
            endpoints = [fl.FlightEndpoint(ticket_name, ["grpc://0.0.0.0:8815"])]
            return fl.FlightInfo(table.schema, descriptor, endpoints, table.num_rows, 0)

        raise KeyError("Unknown ticket name: {}".format(ticket_name))

    def get_schema(self, context, descriptor):
        info = self.get_flight_info(context, descriptor)
        return fl.SchemaResult(info.schema)

    def list_flights(self, context, criteria):
        for ticket_name in self.tables:
            descriptor = fl.FlightDescriptor.for_path(ticket_name)
            yield self.get_flight_info(context, descriptor)

The above example utilizes the “path” type descriptor to identify possible ticket names. Instead of doing it one-by-one, we can also generate the list of all possible flights for that particular endpoint.

Like in the previous cases, now is the time for some client action:

def get_schema(args, client):
    path = args.path
    response = client.get_schema(fl.FlightDescriptor.for_path(path))
    print_response(response.schema)

def get_endpoints(args, client):
    path = args.path
    response = client.get_flight_info(fl.FlightDescriptor.for_path(path))
    print_response(response.endpoints)
    
def list_flights(args, client):
    response = client.list_flights()
    print("=== Response ===")
    for r in response:
        print(r.descriptor)
        print(r.schema)
        print(r.endpoints)
        print(r.total_records)
        print("================")

...

    cmd_get_schema = subcommands.add_parser('get_schema')
    cmd_get_schema.set_defaults(action='get_schema')
    cmd_get_schema.add_argument('-p', '--path', type=str, help="Descriptor path.")

    cmd_get_endpoints = subcommands.add_parser('get_endpoints')
    cmd_get_endpoints.set_defaults(action='get_endpoints')
    cmd_get_endpoints.add_argument('-p', '--path', type=str, help="Descriptor path.")

    cmd_list_flights = subcommands.add_parser('list_flights')
    cmd_list_flights.set_defaults(action='list_flights')

...

    commands = {
        'get_by_ticket': get_by_ticket,
        'get_by_ticket_pandas': get_by_ticket_pandas,
        'get_schema': get_schema,
        'get_endpoints': get_endpoints,
        'list_flights': list_flights,
    }

Usage is very similar as well, we just substitute the ticket name for the path parameter:

python ./_client.py get_schema -p table_int

We now have data that we can get and a way to find out what it is and how to access it. What is next?

It would probably be nice to have a way to put some data on the server:

    def do_put(self, context, descriptor, reader, writer):
        ticket_name = b''.join(descriptor.path)
        self.tables[ticket_name] = reader.read_all()

Now the client has to contain:

def do_put(args, client):
    path = args.path
    values = args.values.split(',')

    table = pa.Table.from_arrays([pa.array(values)], names=['column1'])

    writer, _ = client.do_put(fl.FlightDescriptor.for_path(path), table.schema)
    writer.write_table(table, len(values))
    writer.close()

...

    cmd_do_put = subcommands.add_parser('do_put')
    cmd_do_put.set_defaults(action='do_put')
    cmd_do_put.add_argument('-p', '--path', type=str, help="Descriptor path.")
    cmd_do_put.add_argument('-v', '--values', type=str, help="Values to put on server.")

...

    commands = {
        'get_by_ticket': get_by_ticket,
        'get_by_ticket_pandas': get_by_ticket_pandas,
        'get_schema': get_schema,
        'get_endpoints': get_endpoints,
        'list_flights': list_flights,
        'do_put': do_put,
    }

In this example we will try to take a list of comma-separated values, and push them as they are into the server. This will create a single column of data and generate the appropriate schema, treating the values as text.

Here is how we can run this from the command line:

python ./_client.py do_put -p name_of_table -v a,b,c,x,y,z

Using all the examples together we have a fully functional distributed data access, but Flight offers one other very useful feature - actions. Those allow us to register arbitrarily chosen commands that will be recognized by the server and will allow us to manipulate the state in some additional ways.

Here is how it can be handled on the receive side:

    def list_actions(self, context):
        return [("greet", "returns greeting")]

    def do_action(self, context, action):
        if action.type == "greet":
            yield pa.flight.Result(b'Hello!')
        else:
            raise NotImplementedError("Unknown action: {}".format(action.type))

The list_actions method not only provides the list of all the possible commands, but lets us add some useful documentation about the usage.

Let’s try creating some client code that can interact with that:

def list_actions(args, client):
    response = client.list_actions()
    print_response(response)

def do_action(args, client):
    action_type = args.type
    response = client.do_action(pa.flight.Action(action_type, pa.allocate_buffer(0)))
    print("=== Response ===")
    for r in response:
        print(r.body.to_pybytes())
        print("================")

...

    cmd_list_actions = subcommands.add_parser('list_actions')
    cmd_list_actions.set_defaults(action='list_actions')

    cmd_do_action = subcommands.add_parser('do_action')
    cmd_do_action.set_defaults(action='do_action')
    cmd_do_action.add_argument('-t', '--type', type=str, help="Type of action.")

...

    commands = {
        'get_by_ticket': get_by_ticket,
        'get_by_ticket_pandas': get_by_ticket_pandas,
        'get_schema': get_schema,
        'get_endpoints': get_endpoints,
        'list_flights': list_flights,
        'do_put': do_put,
        'list_actions': list_actions,
        'do_action': do_action,
    }

We could, if we wanted to, provide some additional payload for the action, but for now let’s stick to just saying “hello”:

python ./_client.py do_action -t greet

And that’s it.

In summary, we have a framework that handles a lot of low-level operations efficiently, allows us to integrate in multiple languages, even create a heterogeneous server architecture, and yet interacting with it happens on a reasonably high level. Actions are just the icing on the cake.

Simplicity always wins in the long run, and Apache Arrow Flight seems like a serious contender for the data distribution championship.

The documentation can be a bit lacking at time and the project is in an early development stage, but this is a piece of technology definitely worth trying out.