Examples

Basics

The following example connects to the database and sends two rows (lines).

The connection is unauthenticated and the data is sent at the end of the with block.

Here the questdb.ingress.Sender is constructed with just host and port.

from questdb.ingress import Sender, IngressError
import sys
import datetime


def example(host: str = 'localhost', port: int = 9009):
    try:
        with Sender(host, port) as sender:
            # Record with provided designated timestamp (using the 'at' param)
            # Notice the designated timestamp is expected in Nanoseconds,
            # but timestamps in other columns are expected in Microseconds. 
            # The API provides convenient functions
            sender.row(
                'trades',
                symbols={
                    'pair': 'USDGBP',
                    'type': 'buy'},
                columns={
                    'traded_price': 0.83,
                    'limit_price': 0.84,
                    'qty': 100,
                    'traded_ts': datetime.datetime(
                        2022, 8, 6, 7, 35, 23, 189062,
                        tzinfo=datetime.timezone.utc)},
                at=datetime.datetime.utcnow())

            # If no 'at' param is passed, the server will use its own timestamp.
            sender.row(
                'trades',
                symbols={'pair': 'EURJPY'},
                columns={
                    'traded_price': 135.97,
                    'qty': 400,
                    'limit_price': None})  # NULL columns can be passed as None,
                                           # or simply be left out.

            # We recommend flushing periodically, for example every few seconds.
            # If you don't flush explicitly, the server will flush automatically
            # once the buffer is reaches 63KiB and just before the connection
            # is closed.
            sender.flush()

    except IngressError as e:
        sys.stderr.write(f'Got error: {e}\n')


if __name__ == '__main__':
    example()

Authentication and TLS

Continuing from the previous example, the connection is authenticated and also uses TLS.

Here the questdb.ingress.Sender is also constructed with the auth and tls arguments.

from questdb.ingress import Sender, IngressError
import sys
import datetime


def example(host: str = 'localhost', port: int = 9009):
    try:
        # See: https://questdb.io/docs/reference/api/ilp/authenticate
        auth = (
            "testUser1",                                    # kid
            "5UjEMuA0Pj5pjK8a-fa24dyIf-Es5mYny3oE_Wmus48",  # d
            "fLKYEaoEb9lrn3nkwLDA-M_xnuFOdSt9y0Z7_vWSHLU",  # x
            "Dt5tbS1dEDMSYfym3fgMv0B99szno-dFc1rYF9t0aac")  # y
        with Sender(host, port, auth=auth, tls=True) as sender:
            # Record with provided designated timestamp (using the 'at' param)
            # Notice the designated timestamp is expected in Nanoseconds,
            # but timestamps in other columns are expected in Microseconds. 
            # The API provides convenient functions
            sender.row(
                'trades',
                symbols={
                    'pair': 'USDGBP',
                    'type': 'buy'},
                columns={
                    'traded_price': 0.83,
                    'limit_price': 0.84,
                    'qty': 100,
                    'traded_ts': datetime.datetime(
                        2022, 8, 6, 7, 35, 23, 189062,
                        tzinfo=datetime.timezone.utc)},
                at=datetime.datetime.utcnow())

            # If no 'at' param is passed, the server will use its own timestamp.
            sender.row(
                'trades',
                symbols={'pair': 'EURJPY'},
                columns={
                    'traded_price': 135.97,
                    'qty': 400,
                    'limit_price': None})  # NULL columns can be passed as None,
                                           # or simply be left out.

            # We recommend flushing periodically, for example every few seconds.
            # If you don't flush explicitly, the server will flush automatically
            # once the buffer is reaches 63KiB and just before the connection
            # is closed.
            sender.flush()

    except IngressError as e:
        sys.stderr.write(f'Got error: {e}\n')


if __name__ == '__main__':
    example()

Explicit Buffers

For more advanced use cases where the same messages need to be sent to multiple questdb instances or you want to decouple serialization and sending (as may be in a multi-threaded application) construct questdb.ingress.Buffer objects explicitly, then pass them to the questdb.ingress.Sender.flush() method.

Note that this bypasses auto-flush logic (see questdb.ingress.Sender) and you are fully responsible for ensuring all data is sent.

from questdb.ingress import Sender, TimestampNanos


def example(host: str = 'localhost', port: int = 9009):
    with Sender(host, port) as sender:
        buffer = sender.new_buffer()
        buffer.row(
            'line_sender_buffer_example',
            symbols={'id': 'Hola'},
            columns={'price': 111222233333, 'qty': 3.5},
            at=TimestampNanos(111222233333))
        buffer.row(
            'line_sender_example',
            symbols={'id': 'Adios'},
            columns={'price': 111222233343, 'qty': 2.5},
            at=TimestampNanos(111222233343))
        sender.flush(buffer)


if __name__ == '__main__':
    example()

Ticking Random Data and Timer-based Flush

The following example somewhat mimics the behavior of a loop in an application.

It creates random ticking data at a random interval and flushes it explicitly based on a timer if the auto-flushing logic was not triggered recently.

from questdb.ingress import Sender
import random
import uuid
import time


def example(host: str = 'localhost', port: int = 9009):
    table_name: str = str(uuid.uuid1())
    watermark = 1024  # Flush if the internal buffer exceeds 1KiB
    with Sender(host=host, port=port, auto_flush=watermark) as sender:
        total_rows = 0
        last_flush = time.monotonic()
        try:
            print("Ctrl^C to terminate...")
            while True:
                time.sleep(random.randint(0, 750) / 1000)  # sleep up to 750 ms

                print('Inserting row...')
                sender.row(
                    table_name,
                    symbols={
                        'src': random.choice(('ALPHA', 'BETA', 'OMEGA')),
                        'dst': random.choice(('ALPHA', 'BETA', 'OMEGA'))},
                    columns={
                        'price': random.randint(200, 500),
                        'qty': random.randint(1, 5)})
                total_rows += 1

                # If the internal buffer is empty, then auto-flush triggered.
                if len(sender) == 0:
                    print('Auto-flush triggered.')
                    last_flush = time.monotonic()

                # Flush at least once every five seconds.
                if time.monotonic() - last_flush > 5:
                    print('Timer-flushing triggered.')
                    sender.flush()
                    last_flush = time.monotonic()

        except KeyboardInterrupt:
            print(f"table: {table_name}, total rows sent: {total_rows}")
            print("(wait commitLag for all rows to be available)")
            print("bye!")


if __name__ == '__main__':
    example()