Examples

Basics

Row-by-row Insertion

The following example connects to the database and sends two rows (lines).

The connection is unauthenticated and the data is sent at the end of the with block.

Here the questdb.ingress.Sender is constructed with just host and port.

from questdb.ingress import Sender, IngressError
import sys
import datetime


def example(host: str = 'localhost', port: int = 9009):
    try:
        with Sender(host, port) as sender:
            # Record with provided designated timestamp (using the 'at' param)
            # Notice the designated timestamp is expected in Nanoseconds,
            # but timestamps in other columns are expected in Microseconds. 
            # The API provides convenient functions
            sender.row(
                'trades',
                symbols={
                    'pair': 'USDGBP',
                    'type': 'buy'},
                columns={
                    'traded_price': 0.83,
                    'limit_price': 0.84,
                    'qty': 100,
                    'traded_ts': datetime.datetime(
                        2022, 8, 6, 7, 35, 23, 189062,
                        tzinfo=datetime.timezone.utc)},
                at=datetime.datetime.utcnow())

            # If no 'at' param is passed, the server will use its own timestamp.
            sender.row(
                'trades',
                symbols={'pair': 'EURJPY'},
                columns={
                    'traded_price': 135.97,
                    'qty': 400,
                    'limit_price': None})  # NULL columns can be passed as None,
                                           # or simply be left out.

            # We recommend flushing periodically, for example every few seconds.
            # If you don't flush explicitly, the client will flush automatically
            # once the buffer is reaches 63KiB and just before the connection
            # is closed.
            sender.flush()

    except IngressError as e:
        sys.stderr.write(f'Got error: {e}\n')


if __name__ == '__main__':
    example()

Authentication and TLS

Continuing from the previous example, the connection is authenticated and also uses TLS.

Here the questdb.ingress.Sender is also constructed with the auth and tls arguments.

from questdb.ingress import Sender, IngressError
import sys
import datetime


def example(host: str = 'localhost', port: int = 9009):
    try:
        # See: https://questdb.io/docs/reference/api/ilp/authenticate
        auth = (
            "testUser1",                                    # kid
            "5UjEMuA0Pj5pjK8a-fa24dyIf-Es5mYny3oE_Wmus48",  # d
            "fLKYEaoEb9lrn3nkwLDA-M_xnuFOdSt9y0Z7_vWSHLU",  # x
            "Dt5tbS1dEDMSYfym3fgMv0B99szno-dFc1rYF9t0aac")  # y
        with Sender(host, port, auth=auth, tls=True) as sender:
            # Record with provided designated timestamp (using the 'at' param)
            # Notice the designated timestamp is expected in Nanoseconds,
            # but timestamps in other columns are expected in Microseconds. 
            # The API provides convenient functions
            sender.row(
                'trades',
                symbols={
                    'pair': 'USDGBP',
                    'type': 'buy'},
                columns={
                    'traded_price': 0.83,
                    'limit_price': 0.84,
                    'qty': 100,
                    'traded_ts': datetime.datetime(
                        2022, 8, 6, 7, 35, 23, 189062,
                        tzinfo=datetime.timezone.utc)},
                at=datetime.datetime.utcnow())

            # If no 'at' param is passed, the server will use its own timestamp.
            sender.row(
                'trades',
                symbols={'pair': 'EURJPY'},
                columns={
                    'traded_price': 135.97,
                    'qty': 400,
                    'limit_price': None})  # NULL columns can be passed as None,
                                           # or simply be left out.

            # We recommend flushing periodically, for example every few seconds.
            # If you don't flush explicitly, the client will flush automatically
            # once the buffer is reaches 63KiB and just before the connection
            # is closed.
            sender.flush()

    except IngressError as e:
        sys.stderr.write(f'Got error: {e}\n')


if __name__ == '__main__':
    example()

Explicit Buffers

For more advanced use cases where the same messages need to be sent to multiple questdb instances or you want to decouple serialization and sending (as may be in a multi-threaded application) construct questdb.ingress.Buffer objects explicitly, then pass them to the questdb.ingress.Sender.flush() method.

Note that this bypasses auto-flush logic (see questdb.ingress.Sender) and you are fully responsible for ensuring all data is sent.

from questdb.ingress import Sender, TimestampNanos


def example(host: str = 'localhost', port: int = 9009):
    with Sender(host, port) as sender:
        buffer = sender.new_buffer()
        buffer.row(
            'line_sender_buffer_example',
            symbols={'id': 'Hola'},
            columns={'price': 111222233333, 'qty': 3.5},
            at=TimestampNanos(111222233333))
        buffer.row(
            'line_sender_example',
            symbols={'id': 'Adios'},
            columns={'price': 111222233343, 'qty': 2.5},
            at=TimestampNanos(111222233343))
        sender.flush(buffer)


if __name__ == '__main__':
    example()

Ticking Random Data and Timer-based Flush

The following example somewhat mimics the behavior of a loop in an application.

It creates random ticking data at a random interval and flushes it explicitly based on a timer if the auto-flushing logic was not triggered recently.

from questdb.ingress import Sender
import random
import uuid
import time


def example(host: str = 'localhost', port: int = 9009):
    table_name: str = str(uuid.uuid1())
    watermark = 1024  # Flush if the internal buffer exceeds 1KiB
    with Sender(host=host, port=port, auto_flush=watermark) as sender:
        total_rows = 0
        last_flush = time.monotonic()
        try:
            print("Ctrl^C to terminate...")
            while True:
                time.sleep(random.randint(0, 750) / 1000)  # sleep up to 750 ms

                print('Inserting row...')
                sender.row(
                    table_name,
                    symbols={
                        'src': random.choice(('ALPHA', 'BETA', 'OMEGA')),
                        'dst': random.choice(('ALPHA', 'BETA', 'OMEGA'))},
                    columns={
                        'price': random.randint(200, 500),
                        'qty': random.randint(1, 5)})
                total_rows += 1

                # If the internal buffer is empty, then auto-flush triggered.
                if len(sender) == 0:
                    print('Auto-flush triggered.')
                    last_flush = time.monotonic()

                # Flush at least once every five seconds.
                if time.monotonic() - last_flush > 5:
                    print('Timer-flushing triggered.')
                    sender.flush()
                    last_flush = time.monotonic()

        except KeyboardInterrupt:
            print(f"table: {table_name}, total rows sent: {total_rows}")
            print("(wait commitLag for all rows to be available)")
            print("bye!")


if __name__ == '__main__':
    example()

Data Frames

Pandas Basics

The following example shows how to insert data from a Pandas DataFrame to the 'trades' table.

from questdb.ingress import Sender, IngressError

import sys
import pandas as pd


def example(host: str = 'localhost', port: int = 9009):
    df = pd.DataFrame({
            'pair': ['USDGBP', 'EURJPY'],
            'traded_price': [0.83, 142.62],
            'qty': [100, 400],
            'limit_price': [0.84, None],
            'timestamp': [
                pd.Timestamp('2022-08-06 07:35:23.189062', tz='UTC'),
                pd.Timestamp('2022-08-06 07:35:23.189062', tz='UTC')]})
    try:
        with Sender(host, port) as sender:
            sender.dataframe(
                df,
                table_name='trades',  # Table name to insert into.
                symbols=['pair'],  # Columns to be inserted as SYMBOL types.
                at='timestamp')  # Column containing the designated timestamps.

    except IngressError as e:
        sys.stderr.write(f'Got error: {e}\n')


if __name__ == '__main__':
    example()

For details on all options, see the questdb.ingress.Buffer.dataframe() method.

pd.Categorical and multiple tables

The next example shows some more advanced features inserting data from Pandas.

  • The data is sent to multiple tables.

  • It uses the pd.Categorical type to determine the table to insert and also uses it for the sensor name.

  • Columns of type pd.Categorical are sent as SYMBOL types.

  • The at parameter is specified using a column index: -1 is the last column.

from questdb.ingress import Sender, IngressError

import sys
import pandas as pd


def example(host: str = 'localhost', port: int = 9009):
    df = pd.DataFrame({
            'metric': pd.Categorical(
                ['humidity', 'temp_c', 'voc_index', 'temp_c']),
            'sensor': pd.Categorical(
                ['paris-01', 'london-02', 'london-01', 'paris-01']),
            'value': [
                0.83, 22.62, 100.0, 23.62],
            'ts': [
                pd.Timestamp('2022-08-06 07:35:23.189062'),
                pd.Timestamp('2022-08-06 07:35:23.189062'),
                pd.Timestamp('2022-08-06 07:35:23.189062'),
                pd.Timestamp('2022-08-06 07:35:23.189062')]})
    try:
        with Sender(host, port) as sender:
            sender.dataframe(
                df,
                table_name_col='metric',  # Table name from 'metric' column.
                symbols='auto',  # Category columns as SYMBOL. (Default)
                at=-1)  # Last column contains the designated timestamps.

    except IngressError as e:
        sys.stderr.write(f'Got error: {e}\n')


if __name__ == '__main__':
    example()

After running this example, the rows will be split across the 'humidity', 'temp_c' and 'voc_index' tables.

For details on all options, see the questdb.ingress.Buffer.dataframe() method.

Loading Pandas from a Parquet File

The following example shows how to load a Pandas DataFrame from a Parquet file.

The example also relies on the dataframe’s index name to determine the table name.

from questdb.ingress import Sender
import pandas as pd


def write_parquet_file():
    df = pd.DataFrame({
        'location': pd.Categorical(
            ['BP-5541', 'UB-3355', 'SL-0995', 'BP-6653']),
        'provider': pd.Categorical(
            ['BP Pulse', 'Ubitricity', 'Source London', 'BP Pulse']),
        'speed_kwh': pd.Categorical(
            [50, 7, 7, 120]),
        'connector_type': pd.Categorical(
            ['Type 2 & 2+CCS', 'Type 1 & 2', 'Type 1 & 2', 'Type 2 & 2+CCS']),
        'current_type': pd.Categorical(
            ['dc', 'ac', 'ac', 'dc']),
        'price_pence':
            [54, 34, 32, 59],
        'in_use':
            [True, False, False, True],
        'ts': [
            pd.Timestamp('2022-12-30 12:15:00'),
            pd.Timestamp('2022-12-30 12:16:00'),
            pd.Timestamp('2022-12-30 12:18:00'),
            pd.Timestamp('2022-12-30 12:19:00')]})
    name = 'ev_chargers'
    df.index.name = name  # We set the dataframe's index name here!
    filename = f'{name}.parquet'
    df.to_parquet(filename)
    return filename


def example(host: str = 'localhost', port: int = 9009):
    filename = write_parquet_file()

    df = pd.read_parquet(filename)
    with Sender(host, port) as sender:
        # Note: Table name is looked up from the dataframe's index name.
        sender.dataframe(df, at='ts')


if __name__ == '__main__':
    example()

For details on all options, see the questdb.ingress.Buffer.dataframe() method.