.. _sender: ===================== Sending Data over ILP ===================== Overview ======== The :class:`Sender ` class is a client that inserts rows into QuestDB via the `ILP protocol `_, with support for both ILP over TCP and the newer and recommended ILP over HTTP. The sender also supports TLS and authentication. .. code-block:: python from questdb.ingress import Sender, TimestampNanos import pandas as pd conf = 'http::addr=localhost:9000;' with Sender.from_conf(conf) as sender: # One row at a time sender.row( 'weather_sensor', symbols={'id': 'toronto1'}, columns={'temperature': 23.5, 'humidity': 0.49}, at=TimestampNanos.now()) # Whole dataframes at once - MUCH FASTER df = pd.DataFrame({ 'id': ['dubai2', 'memphis7'], 'temperature': [41.2, 33.3], 'humidity': [0.34, 0.55], 'timestamp': [ pd.Timestamp('2021-01-01 12:00:00'), pd.Timestamp('2021-01-01 12:00:01') ] }) sensor.dataframe('weather_sensor', df, at='timestamp') The ``Sender`` object holds an internal buffer which will be flushed and sent at when the ``with`` block ends. You can read more on :ref:`sender_preparing_data` and :ref:`sender_flushing`. Constructing the Sender ======================= From Configuration ------------------ The ``Sender`` class is generally initialized from a :ref:`configuration string `. .. code-block:: python from questdb.ingress import Sender conf = 'http::addr=localhost:9000;' with Sender.from_conf(conf) as sender: ... See the :ref:`sender_conf` guide for more details. From Env Variable ----------------- You can also initialize the sender from an environment variable:: export QDB_CLIENT_CONF='http::addr=localhost:9000;' The content of the environment variable is the same :ref:`configuration string ` as taken by the :func:`Sender.from_conf ` method, but moving it to an environment variable is more secure and allows you to avoid hardcoding sensitive information such as passwords and tokens in your code. .. code-block:: python from questdb.ingress import Sender with Sender.from_env() as sender: ... Programmatic Construction ------------------------- If you prefer, you can also construct the sender programmatically. See :ref:`sender_programmatic_construction`. .. _sender_preparing_data: Preparing Data ============== Appending Rows -------------- You can append as many rows as you like by calling the :func:`Sender.row ` method. The full method arguments are documented in the :func:`Buffer.row ` method. Appending Pandas Dataframes --------------------------- The sender can also append data from a Pandas dataframe. This is `orders of magnitude `_ faster than appending rows one by one. .. literalinclude:: ../examples/pandas_basic.py :language: python For more details see :func:`Sender.dataframe ` and for full argument options see :func:`Buffer.dataframe `. String vs Symbol Columns ------------------------ QuestDB has a concept of symbols which are a more efficient way of storing categorical data (identifiers). Internally, symbols are deduplicated and stored as integers. When sending data, you can specify a column as a symbol by using the ``symbols`` parameter of the ``row`` or ``dataframe`` methods. Alternatively, if a column is expected to hold a collection of one-off strings, you can use the ``strings`` parameter. Here is an example of sending a row with a symbol and a string: .. code-block:: python from questdb.ingress import Sender, TimestampNanos import datetime conf = 'http::addr=localhost:9000;' with Sender.from_conf(conf) as sender: sender.row( 'news', symbols={ 'category': 'sport'}, columns={ 'headline': 'The big game', 'url': 'https://dailynews.com/sport/the-big-game', 'views': 1000}, at=datetime.datetime(2021, 1, 1, 12, 0, 0)) Populating Timestamps --------------------- The ``at`` parameter of the ``row`` and ``dataframe`` methods is used to specify the timestamp of the rows. Set by client ~~~~~~~~~~~~~ It can be either a :class:`TimestampNanos ` object or a `datetime.datetime `_ object. In case of dataframes you can also specify the timestamp column name or index. If so, the column type should be a Pandas ``datetime64``, with or without timezone information. Note that all timestamps in QuestDB are stored as microseconds since the epoch, without timezone information. Any timezone information is dropped when the data is appended to the ILP buffer. .. _sender_server_timestamp: Set by server ~~~~~~~~~~~~~ If you prefer, you can specify ``at=ServerTimestamp`` which will instruct QuestDB to set the timestamp on your behalf for each row as soon as it's received by the server. .. code-block:: python from questdb.ingress import Sender, ServerTimestamp conf = 'http::addr=localhost:9000;' with Sender.from_conf(conf) as sender: sender.row( 'weather_sensor', symbols={'id': 'toronto1'}, columns={'temperature': 23.5, 'humidity': 0.49}, at=ServerTimestamp) # Legacy feature, not recommended. .. warning:: Using ``ServerTimestamp`` is not recommended as it removes the ability for QuestDB to deduplicate rows and is considered a *legacy feature*. .. _sender_flushing: Flushing ======== The sender accumulates data into an internal buffer. Calling :func:`Sender.flush ` will send the buffered data to QuestDB, and clear the buffer. Flushing can be done explicitly or automatically. Explicit Flushing ----------------- An explicit call to :func:`Sender.flush ` will send any pending data immediately. .. code-block:: python conf = 'http::addr=localhost:9000;' with Sender.from_conf(conf) as sender: sender.row( 'weather_sensor', symbols={'id': 'toronto1'}, columns={'temperature': 23.5, 'humidity': 0.49}, at=TimestampNanos.now()) sender.flush() sender.row( 'weather_sensor', symbols={'id': 'dubai2'}, columns={'temperature': 41.2, 'humidity': 0.34}, at=TimestampNanos.now()) sender.flush() Note that the last `sender.flush()` is entirely optional as flushing also happens at the end of the ``with`` block. .. _sender_auto_flush: Auto-flushing ------------- To avoid accumulating very large buffers, the sender will - by default - occasionally flush the buffer automatically. Auto-flushing is triggered when: * appending a row to the internal sender buffer * and the buffer either: * Reaches 75'000 rows (for HTTP) or 600 rows (for TCP). * Hasn't been flushed for 1 second (there are no timers). Here is an example :ref:`configuration string ` that auto-flushes sets up a sender to flush every 10 rows and disables the interval-based auto-flushing logic. ``http::addr=localhost:9000;auto_flush_rows=10;auto_flush_interval=off;`` Here is a configuration string with auto-flushing completely disabled: ``http::addr=localhost:9000;auto_flush=off;`` See the :ref:`sender_conf_auto_flush` section for more details. and note that ``auto_flush_interval`` :ref:`does NOT start a timer `. Error Reporting =============== **TL;DR: Use HTTP for better error reporting** The sender will do its best to check for errors before sending data to the server. When using the HTTP protocol, the server will send back an error message if the data is invalid or if there is a problem with the server. This will be raised as an :class:`IngressError ` exception. The HTTP layer will also attempt retries, configurable via the :ref:`retry_timeout ` parameter.` When using the TCP protocol errors are *not* sent back from the server and must be searched for in the logs. See the :ref:`troubleshooting-flushing` section for more details. .. _sender_transaction: HTTP Transactions ================= When using the HTTP protocol, the sender can be configured to send a batch of rows as a single transaction. **Transactions are limited to a single table.** .. code-block:: python conf = 'http::addr=localhost:9000;' with Sender.from_conf(conf) as sender: with sender.transaction('weather_sensor') as txn: txn.row( symbols={'id': 'toronto1'}, columns={'temperature': 23.5, 'humidity': 0.49}, at=TimestampNanos.now()) txn.row( symbols={'id': 'dubai2'}, columns={'temperature': 41.2, 'humidity': 0.34}, at=TimestampNanos.now()) If auto-flushing is enabled, any pending data will be flushed before the transaction is started. Auto-flushing is disabled during the scope of the transaction. The transaction is automatically completed a the end of the ``with`` block. * If the there are no errors, the transaction is committed and sent to the server without delays. * If an exception is raised with the block, the transaction is rolled back and the exception is propagated. You can also terminate a transaction explicity by calling the :func:`commit ` or the :func:`rollback ` methods. While transactions that span multiple tables are not supported by QuestDB, you can reuse the same sender for mutliple tables. You can also create parallel transactions by creating multiple sender objects across multiple threads. .. _sender_auto_creation: Table and Column Auto-creation ============================== When sending data to a table that does not exist, the server will create the table automatically. This also applies to columns that do not exist. The server will use the first row of data to determine the column types. If the table already exists, the server will validate that the columns match the existing table. If you're using QuestDB enterprise you might need to grant further permissions to the authenticated user. .. code-block:: sql CREATE SERVICE ACCOUNT ingest; GRANT ilp, create table TO ingest; GRANT add column, insert ON all tables TO ingest; -- OR GRANT add column, insert ON table1, table2 TO ingest; Read more setup details in the `Enterprise quickstart `_ and the `role-based access control `_ guides. .. _sender_advanced: Advanced Usage ============== Independent Buffers ------------------- All examples so far have shown appending data to the sender's internal buffer. You can also create independent buffers and send them independently. This is useful for more complex applications whishing to decouple the serialisation logic from the sending logic. Note that the sender's auto-flushing logic will not apply to independent buffers. .. code-block:: python from questdb.ingress import Buffer, Sender, TimestampNanos buf = Buffer() buf.row( 'weather_sensor', symbols={'id': 'toronto1'}, columns={'temperature': 23.5, 'humidity': 0.49}, at=TimestampNanos.now()) buf.row( 'weather_sensor', symbols={'id': 'dubai2'}, columns={'temperature': 41.2, 'humidity': 0.34}, at=TimestampNanos.now()) conf = 'http::addr=localhost:9000;' with Sender.from_conf(conf) as sender: sender.flush(buf, transaction=True) The ``transaction`` parameter is optional and defaults to ``False``. When set to ``True``, the buffer is guaranteed to be committed as a single transaction, but must only contain rows for a single table. Multiple Databases ------------------ Handling buffers explicitly is also useful when sending data to multiple databases via the ``.flush(buf, clear=False)`` option. .. code-block:: python from questdb.ingress import Buffer, Sender, TimestampNanos buf = Buffer() buf.row( 'weather_sensor', symbols={'id': 'toronto1'}, columns={'temperature': 23.5, 'humidity': 0.49}, at=TimestampNanos.now()) conf1 = 'http::addr=db1.host.com:9000;' conf2 = 'http::addr=db2.host.com:9000;' with Sender.from_conf(conf1) as sender1, Sender.from_conf(conf2) as sender2: sender1.flush(buf1, clear=False) sender2.flush(buf2, clear=False) buf.clear() This uses the ``clear=False`` parameter which otherwise defaults to ``True``. Threading Considerations ------------------------ Neither buffer API nor the sender object are thread-safe, but can be shared between threads if you take care of exclusive access (such as using a lock) yourself. Independent buffers also allows you to prepare separate buffers in different threads and then send them later through a single exclusively locked sender. Alternatively you can also create multiple senders, one per thread. Notice that the ``questdb`` python module is mostly implemented in native code and is designed to release the Python GIL whenever possible, so you can expect good performance in multi-threaded scenarios. As an example, appending a dataframe to a buffer releases the GIL (unless any of the columns reference python objects). All network activity also fully releases the GIL. .. _sender_http_performance: Optimising HTTP Performance --------------------------- The sender's network communication is implemented in native code and thus does not require access to the GIL, allowing for true parallelism when used using multiple threads. For simplicity of design and best error feedback, the `.flush()` method blocks until the server has acknowledged the data. If you need to send a large number of smaller requests (in other words, if you need to flush very frequently) or are in a high-latency network, you can significantly improve performance by creating and sending using multiple sender objects in parallel. .. code-block:: python from questdb.ingress import Sender, TimestampNanos import pandas as pd from concurrent.futures import ThreadPoolExecutor import datetime def send_data(df): conf_string = 'http::addr=localhost:9000;' with Sender.from_conf(conf_string) as sender: sender.dataframe( df, table_name='weather_sensor', symbols=['id'], at='timestamp') dfs = [ pd.DataFrame({ 'id': ['sensor1', 'sensor2'], 'temperature': [22.5, 24.7], 'humidity': [0.45, 0.47], 'timestamp': [ pd.Timestamp('2017-01-01T12:00:00'), pd.Timestamp('2017-01-01T12:00:01') ]}), pd.DataFrame({ 'id': ['sensor3', 'sensor4'], 'temperature': [23.1, 25.3], 'humidity': [0.48, 0.50], 'timestamp': [ pd.Timestamp('2017-01-01T12:00:02'), pd.Timestamp('2017-01-01T12:00:03') ]}) ] with ThreadPoolExecutor() as executor: futures = [executor.submit(send_data, df) for df in dfs] for future in futures: future.result() For maxium performance you should also cache the sender objects and reuse them across multiple requests, since internally they maintain a connection pool. Sender Lifetime Control ----------------------- Instead of using a ``with Sender .. as sender:`` block you can also manually control the lifetime of the sender object. .. code-block:: python from questdb.ingress import Sender conf = 'http::addr=localhost:9000;' sender = Sender.from_conf(conf) sender.establish() # ... sender.close() The :func:`establish ` method is needs to be called exactly once, but the :func:`close ` method is idempotent and can be called multiple times. Table and Column Names ====================== The client will validate table and column names while constructing the buffer. Table names and column names must not be empty and must adhere to the following: Table Names ----------- Cannot contain the following characters: ``?``, ``,``, ``'``, ``"``, ``\``, ``/``, ``:``, ``)``, ``(``, ``+``, ``*``, ``%``, ``~``, carriage return (``\r``), newline (``\n``), null character (``\0``), and Unicode characters from ``\u{0001}`` to ``\u{000F}`` and ``\u{007F}``. Additionally, the Unicode character for zero-width no-break space (UTF-8 BOM, ``\u{FEFF}``) is not allowed. A dot (``.``) is allowed except at the start or end of the name, and cannot be consecutive (e.g., ``valid.name`` is valid, but ``.invalid``, ``invalid.``, and ``in..valid`` are not). Column Names ------------ Cannot contain the following characters: ``?``, ``.``, ``,``, ``'``, ``"``, ``\``, ``/``, ``:``, ``)``, ``(``, ``+``, ``-``, ``*``, ``%``, ``~``, carriage return (``\r``), newline (``\n``), null character (``\0``), and Unicode characters from ``\u{0001}`` to ``\u{000F}`` and ``\u{007F}``. Like table names, the Unicode character for zero-width no-break space (UTF-8 BOM, ``\u{FEFF}``) is not allowed. Unlike table names, a dot (``.``) is not allowed in column names at all. .. _sender_programmatic_construction: Programmatic Construction ========================= Sender Constructor ------------------ You can also specify the configuration parameters programmatically: .. code-block:: python from questdb.ingress import Sender, Protocol from datetime import timedelta with Sender(Protocol.Tcp, 'localhost', 9009, auto_flush=True, auto_flush_interval=timedelta(seconds=10)) as sender: ... See the :ref:`sender_conf` section for a full list of configuration parameters: each configuration parameter can be passed as named arguments to the constructor. Python type mappings: * Parameters that require strings take a ``str``. * Parameters that require numbers can also take an ``int``. * Millisecond durations can take an ``int`` or a ``datetime.timedelta``. * Any ``'on'`` / ``'off'`` / ``'unsafe_off'`` parameters can also be specified as a ``bool``. * Paths can also be specified as a ``pathlib.Path``. .. note:: The constructor arguments have changed between 1.x and 2.x. If you are upgrading, take a look at the :ref:`changelog `. Customising ``.from_conf()`` and ``.from_env()`` ------------------------------------------------ If you want to further customise the behaviour of the ``.from_conf()`` or ``.from_env()`` methods, you can pass additional parameters to these methods. The parameters are the same as the ones for the ``Sender`` constructor, as documented above. For example, here is a :ref:`configuration string ` that is loaded from an environment variable and then customised to specify a 10 second auto-flush interval:: export QDB_CLIENT_CONF='http::addr=localhost:9000;' .. code-block:: python from questdb.ingress import Sender, Protocol from datetime import timedelta with Sender.from_env(auto_flush_interval=timedelta(seconds=10)) as sender: ... .. _sender_which_protocol: ILP/TCP or ILP/HTTP =================== The sender supports ``tcp``, ``tcps``, ``http``, and ``https`` protocols. You should prefer to use the new ILP/HTTP protocol instead of ILP/TCP in most cases as it provides better feedback on errors and transaction control. ILP/HTTP is available from: * QuestDB 7.3.10 and later. * QuestDB Enterprise 1.2.7 and later. Since TCP does not block for a response it is useful for high-throughput scenarios in higher latency networks or on older versions of QuestDB which do not support ILP/HTTP quite yet. It should be noted that you can achieve equivalent or better performance to TCP with HTTP by :ref:`using multiple sender objects in parallel `. Either way, you can easily switch between the two protocols by changing: * The ```` part of the :ref:`configuration string `. * The port number (ILP/TCP default is 9009, ILP/HTTP default is 9000). * Any :ref:`authentication parameters ` such as ``username``, ``token``, et cetera.