Sending Data over ILP

Overview

The Sender class is a client that inserts rows into QuestDB via the ILP protocol, with support for both ILP over TCP and the newer and recommended ILP over HTTP. The sender also supports TLS and authentication.

from questdb.ingress import Sender, TimestampNanos
import pandas as pd

conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
    # One row at a time
    sender.row(
        'weather_sensor',
        symbols={'id': 'toronto1'},
        columns={'temperature': 23.5, 'humidity': 0.49},
        at=TimestampNanos.now())

    # Whole dataframes at once - MUCH FASTER
    df = pd.DataFrame({
        'id': ['dubai2', 'memphis7'],
        'temperature': [41.2, 33.3],
        'humidity': [0.34, 0.55],
        'timestamp': [
            pd.Timestamp('2021-01-01 12:00:00'),
            pd.Timestamp('2021-01-01 12:00:01')
        ]
    })
    sensor.dataframe('weather_sensor', df, at='timestamp')

The Sender object holds an internal buffer which will be flushed and sent at when the with block ends.

You can read more on Preparing Data and Flushing.

Constructing the Sender

From Configuration

The Sender class is generally initialized from a configuration string.

from questdb.ingress import Sender

conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
    ...

See the Configuration guide for more details.

From Env Variable

You can also initialize the sender from an environment variable:

export QDB_CLIENT_CONF='http::addr=localhost:9000;'

The content of the environment variable is the same configuration string as taken by the Sender.from_conf method, but moving it to an environment variable is more secure and allows you to avoid hardcoding sensitive information such as passwords and tokens in your code.

from questdb.ingress import Sender

with Sender.from_env() as sender:
    ...

Programmatic Construction

If you prefer, you can also construct the sender programmatically. See Programmatic Construction.

Preparing Data

Appending Rows

You can append as many rows as you like by calling the Sender.row method. The full method arguments are documented in the Buffer.row method.

Appending Pandas Dataframes

The sender can also append data from a Pandas dataframe.

This is orders of magnitude faster than appending rows one by one.

from questdb.ingress import Sender, IngressError

import sys
import pandas as pd


def example(host: str = 'localhost', port: int = 9000):
    df = pd.DataFrame({
            'pair': ['USDGBP', 'EURJPY'],
            'traded_price': [0.83, 142.62],
            'qty': [100, 400],
            'limit_price': [0.84, None],
            'timestamp': [
                pd.Timestamp('2022-08-06 07:35:23.189062', tz='UTC'),
                pd.Timestamp('2022-08-06 07:35:23.189062', tz='UTC')]})
    try:
        with Sender.from_conf(f"http::addr={host}:{port};") as sender:
            sender.dataframe(
                df,
                table_name='trades',  # Table name to insert into.
                symbols=['pair'],  # Columns to be inserted as SYMBOL types.
                at='timestamp')  # Column containing the designated timestamps.

    except IngressError as e:
        sys.stderr.write(f'Got error: {e}\n')


if __name__ == '__main__':
    example()

For more details see Sender.dataframe and for full argument options see Buffer.dataframe.

String vs Symbol Columns

QuestDB has a concept of symbols which are a more efficient way of storing categorical data (identifiers). Internally, symbols are deduplicated and stored as integers.

When sending data, you can specify a column as a symbol by using the symbols parameter of the row or dataframe methods.

Alternatively, if a column is expected to hold a collection of one-off strings, you can use the strings parameter.

Here is an example of sending a row with a symbol and a string:

from questdb.ingress import Sender, TimestampNanos
import datetime

conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
    sender.row(
        'news',
        symbols={
            'category': 'sport'},
        columns={
            'headline': 'The big game',
            'url': 'https://dailynews.com/sport/the-big-game',
            'views': 1000},
        at=datetime.datetime(2021, 1, 1, 12, 0, 0))

Populating Timestamps

The at parameter of the row and dataframe methods is used to specify the timestamp of the rows.

Set by client

It can be either a TimestampNanos object or a datetime.datetime object.

In case of dataframes you can also specify the timestamp column name or index. If so, the column type should be a Pandas datetime64, with or without timezone information.

Note that all timestamps in QuestDB are stored as microseconds since the epoch, without timezone information. Any timezone information is dropped when the data is appended to the ILP buffer.

Set by server

If you prefer, you can specify at=ServerTimestamp which will instruct QuestDB to set the timestamp on your behalf for each row as soon as it’s received by the server.

from questdb.ingress import Sender, ServerTimestamp

conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
    sender.row(
        'weather_sensor',
        symbols={'id': 'toronto1'},
        columns={'temperature': 23.5, 'humidity': 0.49},
        at=ServerTimestamp)  # Legacy feature, not recommended.

Warning

Using ServerTimestamp is not recommended as it removes the ability for QuestDB to deduplicate rows and is considered a legacy feature.

Flushing

The sender accumulates data into an internal buffer. Calling Sender.flush will send the buffered data to QuestDB, and clear the buffer.

Flushing can be done explicitly or automatically.

Explicit Flushing

An explicit call to Sender.flush will send any pending data immediately.

conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
    sender.row(
        'weather_sensor',
        symbols={'id': 'toronto1'},
        columns={'temperature': 23.5, 'humidity': 0.49},
        at=TimestampNanos.now())
    sender.flush()
    sender.row(
        'weather_sensor',
        symbols={'id': 'dubai2'},
        columns={'temperature': 41.2, 'humidity': 0.34},
        at=TimestampNanos.now())
    sender.flush()

Note that the last sender.flush() is entirely optional as flushing also happens at the end of the with block.

Auto-flushing

To avoid accumulating very large buffers, the sender will - by default - occasionally flush the buffer automatically.

Auto-flushing is triggered when:

  • appending a row to the internal sender buffer

  • and the buffer either:

    • Reaches 75’000 rows (for HTTP) or 600 rows (for TCP).

    • Hasn’t been flushed for 1 second (there are no timers).

Here is an example configuration string that auto-flushes sets up a sender to flush every 10 rows and disables the interval-based auto-flushing logic.

http::addr=localhost:9000;auto_flush_rows=10;auto_flush_interval=off;

Here is a configuration string with auto-flushing completely disabled:

http::addr=localhost:9000;auto_flush=off;

See the Auto-flushing section for more details. and note that auto_flush_interval does NOT start a timer.

Error Reporting

TL;DR: Use HTTP for better error reporting

The sender will do its best to check for errors before sending data to the server.

When using the HTTP protocol, the server will send back an error message if the data is invalid or if there is a problem with the server. This will be raised as an IngressError exception.

The HTTP layer will also attempt retries, configurable via the retry_timeout parameter.`

When using the TCP protocol errors are not sent back from the server and must be searched for in the logs. See the Errors during flushing section for more details.

HTTP Transactions

When using the HTTP protocol, the sender can be configured to send a batch of rows as a single transaction.

Transactions are limited to a single table.

conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
    with sender.transaction('weather_sensor') as txn:
        txn.row(
            symbols={'id': 'toronto1'},
            columns={'temperature': 23.5, 'humidity': 0.49},
            at=TimestampNanos.now())
        txn.row(
            symbols={'id': 'dubai2'},
            columns={'temperature': 41.2, 'humidity': 0.34},
            at=TimestampNanos.now())

If auto-flushing is enabled, any pending data will be flushed before the transaction is started.

Auto-flushing is disabled during the scope of the transaction.

The transaction is automatically completed a the end of the with block.

  • If the there are no errors, the transaction is committed and sent to the server without delays.

  • If an exception is raised with the block, the transaction is rolled back and the exception is propagated.

You can also terminate a transaction explicity by calling the commit or the rollback methods.

While transactions that span multiple tables are not supported by QuestDB, you can reuse the same sender for mutliple tables.

You can also create parallel transactions by creating multiple sender objects across multiple threads.

Table and Column Auto-creation

When sending data to a table that does not exist, the server will create the table automatically.

This also applies to columns that do not exist.

The server will use the first row of data to determine the column types.

If the table already exists, the server will validate that the columns match the existing table.

If you’re using QuestDB enterprise you might need to grant further permissions to the authenticated user.

CREATE SERVICE ACCOUNT ingest;
GRANT ilp, create table TO ingest;
GRANT add column, insert ON all tables TO ingest;
--  OR
GRANT add column, insert ON table1, table2 TO ingest;

Read more setup details in the Enterprise quickstart and the role-based access control guides.

Advanced Usage

Independent Buffers

All examples so far have shown appending data to the sender’s internal buffer.

You can also create independent buffers and send them independently.

This is useful for more complex applications whishing to decouple the serialisation logic from the sending logic.

Note that the sender’s auto-flushing logic will not apply to independent buffers.

from questdb.ingress import Buffer, Sender, TimestampNanos

buf = Buffer()
buf.row(
    'weather_sensor',
    symbols={'id': 'toronto1'},
    columns={'temperature': 23.5, 'humidity': 0.49},
    at=TimestampNanos.now())
buf.row(
    'weather_sensor',
    symbols={'id': 'dubai2'},
    columns={'temperature': 41.2, 'humidity': 0.34},
    at=TimestampNanos.now())

conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
    sender.flush(buf, transaction=True)

The transaction parameter is optional and defaults to False. When set to True, the buffer is guaranteed to be committed as a single transaction, but must only contain rows for a single table.

Multiple Databases

Handling buffers explicitly is also useful when sending data to multiple databases via the .flush(buf, clear=False) option.

from questdb.ingress import Buffer, Sender, TimestampNanos

buf = Buffer()
buf.row(
    'weather_sensor',
    symbols={'id': 'toronto1'},
    columns={'temperature': 23.5, 'humidity': 0.49},
    at=TimestampNanos.now())

conf1 = 'http::addr=db1.host.com:9000;'
conf2 = 'http::addr=db2.host.com:9000;'
with Sender.from_conf(conf1) as sender1, Sender.from_conf(conf2) as sender2:
    sender1.flush(buf1, clear=False)
    sender2.flush(buf2, clear=False)

buf.clear()

This uses the clear=False parameter which otherwise defaults to True.

Threading Considerations

Neither buffer API nor the sender object are thread-safe, but can be shared between threads if you take care of exclusive access (such as using a lock) yourself.

Independent buffers also allows you to prepare separate buffers in different threads and then send them later through a single exclusively locked sender.

Alternatively you can also create multiple senders, one per thread.

Notice that the questdb python module is mostly implemented in native code and is designed to release the Python GIL whenever possible, so you can expect good performance in multi-threaded scenarios.

As an example, appending a dataframe to a buffer releases the GIL (unless any of the columns reference python objects).

All network activity also fully releases the GIL.

Optimising HTTP Performance

The sender’s network communication is implemented in native code and thus does not require access to the GIL, allowing for true parallelism when used using multiple threads.

For simplicity of design and best error feedback, the .flush() method blocks until the server has acknowledged the data.

If you need to send a large number of smaller requests (in other words, if you need to flush very frequently) or are in a high-latency network, you can significantly improve performance by creating and sending using multiple sender objects in parallel.

from questdb.ingress import Sender, TimestampNanos
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import datetime

def send_data(df):
    conf_string = 'http::addr=localhost:9000;'
    with Sender.from_conf(conf_string) as sender:
        sender.dataframe(
            df,
            table_name='weather_sensor',
            symbols=['id'],
            at='timestamp')

dfs = [
    pd.DataFrame({
        'id': ['sensor1', 'sensor2'],
        'temperature': [22.5, 24.7],
        'humidity': [0.45, 0.47],
        'timestamp': [
            pd.Timestamp('2017-01-01T12:00:00'),
            pd.Timestamp('2017-01-01T12:00:01')
        ]}),
    pd.DataFrame({
        'id': ['sensor3', 'sensor4'],
        'temperature': [23.1, 25.3],
        'humidity': [0.48, 0.50],
        'timestamp': [
            pd.Timestamp('2017-01-01T12:00:02'),
            pd.Timestamp('2017-01-01T12:00:03')
        ]})
]

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(send_data, df)
        for df in dfs]
    for future in futures:
        future.result()

For maxium performance you should also cache the sender objects and reuse them across multiple requests, since internally they maintain a connection pool.

Sender Lifetime Control

Instead of using a with Sender .. as sender: block you can also manually control the lifetime of the sender object.

from questdb.ingress import Sender

conf = 'http::addr=localhost:9000;'
sender = Sender.from_conf(conf)
sender.establish()
# ...
sender.close()

The establish method is needs to be called exactly once, but the close method is idempotent and can be called multiple times.

Table and Column Names

The client will validate table and column names while constructing the buffer.

Table names and column names must not be empty and must adhere to the following:

Table Names

Cannot contain the following characters: ?, ,, ', ", \, /, :, ), (, +, *, %, ~, carriage return (\r), newline (\n), null character (\0), and Unicode characters from \u{0001} to \u{000F} and \u{007F}. Additionally, the Unicode character for zero-width no-break space (UTF-8 BOM, \u{FEFF}) is not allowed.

A dot (.) is allowed except at the start or end of the name, and cannot be consecutive (e.g., valid.name is valid, but .invalid, invalid., and in..valid are not).

Column Names

Cannot contain the following characters: ?, ., ,, ', ", \, /, :, ), (, +, -, *, %, ~, carriage return (\r), newline (\n), null character (\0), and Unicode characters from \u{0001} to \u{000F} and \u{007F}. Like table names, the Unicode character for zero-width no-break space (UTF-8 BOM, \u{FEFF}) is not allowed.

Unlike table names, a dot (.) is not allowed in column names at all.

Programmatic Construction

Sender Constructor

You can also specify the configuration parameters programmatically:

from questdb.ingress import Sender, Protocol
from datetime import timedelta

with Sender(Protocol.Tcp, 'localhost', 9009,
        auto_flush=True,
        auto_flush_interval=timedelta(seconds=10)) as sender:
    ...

See the Configuration section for a full list of configuration parameters: each configuration parameter can be passed as named arguments to the constructor.

Python type mappings:

  • Parameters that require strings take a str.

  • Parameters that require numbers can also take an int.

  • Millisecond durations can take an int or a datetime.timedelta.

  • Any 'on' / 'off' / 'unsafe_off' parameters can also be specified as a bool.

  • Paths can also be specified as a pathlib.Path.

Note

The constructor arguments have changed between 1.x and 2.x. If you are upgrading, take a look at the changelog.

Customising .from_conf() and .from_env()

If you want to further customise the behaviour of the .from_conf() or .from_env() methods, you can pass additional parameters to these methods. The parameters are the same as the ones for the Sender constructor, as documented above.

For example, here is a configuration string that is loaded from an environment variable and then customised to specify a 10 second auto-flush interval:

export QDB_CLIENT_CONF='http::addr=localhost:9000;'
from questdb.ingress import Sender, Protocol
from datetime import timedelta

with Sender.from_env(auto_flush_interval=timedelta(seconds=10)) as sender:
    ...

ILP/TCP or ILP/HTTP

The sender supports tcp, tcps, http, and https protocols.

You should prefer to use the new ILP/HTTP protocol instead of ILP/TCP in most cases as it provides better feedback on errors and transaction control.

ILP/HTTP is available from:

  • QuestDB 7.3.10 and later.

  • QuestDB Enterprise 1.2.7 and later.

Since TCP does not block for a response it is useful for high-throughput scenarios in higher latency networks or on older versions of QuestDB which do not support ILP/HTTP quite yet.

It should be noted that you can achieve equivalent or better performance to TCP with HTTP by using multiple sender objects in parallel.

Either way, you can easily switch between the two protocols by changing: