Sending Data over ILP¶
Overview¶
The Sender
class is a client that inserts
rows into QuestDB via the
ILP protocol, with
support for both ILP over TCP and the newer and recommended ILP over HTTP.
The sender also supports TLS and authentication.
from questdb.ingress import Sender, TimestampNanos
import pandas as pd
conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
# One row at a time
sender.row(
'weather_sensor',
symbols={'id': 'toronto1'},
columns={'temperature': 23.5, 'humidity': 0.49},
at=TimestampNanos.now())
# Whole dataframes at once - MUCH FASTER
df = pd.DataFrame({
'id': ['dubai2', 'memphis7'],
'temperature': [41.2, 33.3],
'humidity': [0.34, 0.55],
'timestamp': [
pd.Timestamp('2021-01-01 12:00:00'),
pd.Timestamp('2021-01-01 12:00:01')
]
})
sensor.dataframe('weather_sensor', df, at='timestamp')
The Sender
object holds an internal buffer which will be flushed and sent
at when the with
block ends.
You can read more on Preparing Data and Flushing.
Constructing the Sender¶
From Configuration¶
The Sender
class is generally initialized from a
configuration string.
from questdb.ingress import Sender
conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
...
See the Configuration guide for more details.
From Env Variable¶
You can also initialize the sender from an environment variable:
export QDB_CLIENT_CONF='http::addr=localhost:9000;'
The content of the environment variable is the same
configuration string as taken by the
Sender.from_conf
method,
but moving it to an environment variable is more secure and allows you to avoid
hardcoding sensitive information such as passwords and tokens in your code.
from questdb.ingress import Sender
with Sender.from_env() as sender:
...
Programmatic Construction¶
If you prefer, you can also construct the sender programmatically. See Programmatic Construction.
Preparing Data¶
Appending Rows¶
You can append as many rows as you like by calling the
Sender.row
method. The full method arguments are
documented in the Buffer.row
method.
Appending Pandas Dataframes¶
The sender can also append data from a Pandas dataframe.
This is orders of magnitude faster than appending rows one by one.
from questdb.ingress import Sender, IngressError
import sys
import pandas as pd
def example(host: str = 'localhost', port: int = 9000):
df = pd.DataFrame({
'pair': ['USDGBP', 'EURJPY'],
'traded_price': [0.83, 142.62],
'qty': [100, 400],
'limit_price': [0.84, None],
'timestamp': [
pd.Timestamp('2022-08-06 07:35:23.189062', tz='UTC'),
pd.Timestamp('2022-08-06 07:35:23.189062', tz='UTC')]})
try:
with Sender.from_conf(f"http::addr={host}:{port};") as sender:
sender.dataframe(
df,
table_name='trades', # Table name to insert into.
symbols=['pair'], # Columns to be inserted as SYMBOL types.
at='timestamp') # Column containing the designated timestamps.
except IngressError as e:
sys.stderr.write(f'Got error: {e}\n')
if __name__ == '__main__':
example()
For more details see Sender.dataframe
and for full argument options see
Buffer.dataframe
.
String vs Symbol Columns¶
QuestDB has a concept of symbols which are a more efficient way of storing categorical data (identifiers). Internally, symbols are deduplicated and stored as integers.
When sending data, you can specify a column as a symbol by using the
symbols
parameter of the row
or dataframe
methods.
Alternatively, if a column is expected to hold a collection of one-off strings,
you can use the strings
parameter.
Here is an example of sending a row with a symbol and a string:
from questdb.ingress import Sender, TimestampNanos
import datetime
conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
sender.row(
'news',
symbols={
'category': 'sport'},
columns={
'headline': 'The big game',
'url': 'https://dailynews.com/sport/the-big-game',
'views': 1000},
at=datetime.datetime(2021, 1, 1, 12, 0, 0))
Populating Timestamps¶
The at
parameter of the row
and dataframe
methods is used to specify
the timestamp of the rows.
Set by client¶
It can be either a TimestampNanos
object or a
datetime.datetime object.
In case of dataframes you can also specify the timestamp column name or index.
If so, the column type should be a Pandas datetime64
, with or without
timezone information.
Note that all timestamps in QuestDB are stored as microseconds since the epoch, without timezone information. Any timezone information is dropped when the data is appended to the ILP buffer.
Set by server¶
If you prefer, you can specify at=ServerTimestamp
which will instruct
QuestDB to set the timestamp on your behalf for each row as soon as it’s
received by the server.
from questdb.ingress import Sender, ServerTimestamp
conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
sender.row(
'weather_sensor',
symbols={'id': 'toronto1'},
columns={'temperature': 23.5, 'humidity': 0.49},
at=ServerTimestamp) # Legacy feature, not recommended.
Warning
Using ServerTimestamp
is not recommended as it removes the ability
for QuestDB to deduplicate rows and is considered a legacy feature.
Flushing¶
The sender accumulates data into an internal buffer. Calling
Sender.flush
will send the buffered data
to QuestDB, and clear the buffer.
Flushing can be done explicitly or automatically.
Explicit Flushing¶
An explicit call to Sender.flush
will
send any pending data immediately.
conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
sender.row(
'weather_sensor',
symbols={'id': 'toronto1'},
columns={'temperature': 23.5, 'humidity': 0.49},
at=TimestampNanos.now())
sender.flush()
sender.row(
'weather_sensor',
symbols={'id': 'dubai2'},
columns={'temperature': 41.2, 'humidity': 0.34},
at=TimestampNanos.now())
sender.flush()
Note that the last sender.flush() is entirely optional as flushing
also happens at the end of the with
block.
Auto-flushing¶
To avoid accumulating very large buffers, the sender will - by default - occasionally flush the buffer automatically.
Auto-flushing is triggered when:
appending a row to the internal sender buffer
and the buffer either:
Reaches 75’000 rows (for HTTP) or 600 rows (for TCP).
Hasn’t been flushed for 1 second (there are no timers).
Here is an example configuration string that auto-flushes sets up a sender to flush every 10 rows and disables the interval-based auto-flushing logic.
http::addr=localhost:9000;auto_flush_rows=10;auto_flush_interval=off;
Here is a configuration string with auto-flushing completely disabled:
http::addr=localhost:9000;auto_flush=off;
See the Auto-flushing section for more details. and note that
auto_flush_interval
does NOT start a timer.
Error Reporting¶
TL;DR: Use HTTP for better error reporting
The sender will do its best to check for errors before sending data to the server.
When using the HTTP protocol, the server will send back an error message if
the data is invalid or if there is a problem with the server. This will be
raised as an IngressError
exception.
The HTTP layer will also attempt retries, configurable via the retry_timeout parameter.`
When using the TCP protocol errors are not sent back from the server and must be searched for in the logs. See the Errors during flushing section for more details.
HTTP Transactions¶
When using the HTTP protocol, the sender can be configured to send a batch of rows as a single transaction.
Transactions are limited to a single table.
conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
with sender.transaction('weather_sensor') as txn:
txn.row(
symbols={'id': 'toronto1'},
columns={'temperature': 23.5, 'humidity': 0.49},
at=TimestampNanos.now())
txn.row(
symbols={'id': 'dubai2'},
columns={'temperature': 41.2, 'humidity': 0.34},
at=TimestampNanos.now())
If auto-flushing is enabled, any pending data will be flushed before the transaction is started.
Auto-flushing is disabled during the scope of the transaction.
The transaction is automatically completed a the end
of the with
block.
If the there are no errors, the transaction is committed and sent to the server without delays.
If an exception is raised with the block, the transaction is rolled back and the exception is propagated.
You can also terminate a transaction explicity by calling the
commit
or the
rollback
methods.
While transactions that span multiple tables are not supported by QuestDB, you can reuse the same sender for mutliple tables.
You can also create parallel transactions by creating multiple sender objects across multiple threads.
Table and Column Auto-creation¶
When sending data to a table that does not exist, the server will create the table automatically.
This also applies to columns that do not exist.
The server will use the first row of data to determine the column types.
If the table already exists, the server will validate that the columns match the existing table.
If you’re using QuestDB enterprise you might need to grant further permissions to the authenticated user.
CREATE SERVICE ACCOUNT ingest;
GRANT ilp, create table TO ingest;
GRANT add column, insert ON all tables TO ingest;
-- OR
GRANT add column, insert ON table1, table2 TO ingest;
Read more setup details in the Enterprise quickstart and the role-based access control guides.
Advanced Usage¶
Independent Buffers¶
All examples so far have shown appending data to the sender’s internal buffer.
You can also create independent buffers and send them independently.
This is useful for more complex applications whishing to decouple the serialisation logic from the sending logic.
Note that the sender’s auto-flushing logic will not apply to independent buffers.
from questdb.ingress import Buffer, Sender, TimestampNanos
buf = Buffer()
buf.row(
'weather_sensor',
symbols={'id': 'toronto1'},
columns={'temperature': 23.5, 'humidity': 0.49},
at=TimestampNanos.now())
buf.row(
'weather_sensor',
symbols={'id': 'dubai2'},
columns={'temperature': 41.2, 'humidity': 0.34},
at=TimestampNanos.now())
conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
sender.flush(buf, transaction=True)
The transaction
parameter is optional and defaults to False
.
When set to True
, the buffer is guaranteed to be committed as a single
transaction, but must only contain rows for a single table.
Multiple Databases¶
Handling buffers explicitly is also useful when sending data to multiple
databases via the .flush(buf, clear=False)
option.
from questdb.ingress import Buffer, Sender, TimestampNanos
buf = Buffer()
buf.row(
'weather_sensor',
symbols={'id': 'toronto1'},
columns={'temperature': 23.5, 'humidity': 0.49},
at=TimestampNanos.now())
conf1 = 'http::addr=db1.host.com:9000;'
conf2 = 'http::addr=db2.host.com:9000;'
with Sender.from_conf(conf1) as sender1, Sender.from_conf(conf2) as sender2:
sender1.flush(buf1, clear=False)
sender2.flush(buf2, clear=False)
buf.clear()
This uses the clear=False
parameter which otherwise defaults to True
.
Threading Considerations¶
Neither buffer API nor the sender object are thread-safe, but can be shared between threads if you take care of exclusive access (such as using a lock) yourself.
Independent buffers also allows you to prepare separate buffers in different threads and then send them later through a single exclusively locked sender.
Alternatively you can also create multiple senders, one per thread.
Notice that the questdb
python module is mostly implemented in native code
and is designed to release the Python GIL whenever possible, so you can expect
good performance in multi-threaded scenarios.
As an example, appending a dataframe to a buffer releases the GIL (unless any of the columns reference python objects).
All network activity also fully releases the GIL.
Optimising HTTP Performance¶
The sender’s network communication is implemented in native code and thus does not require access to the GIL, allowing for true parallelism when used using multiple threads.
For simplicity of design and best error feedback, the .flush() method blocks until the server has acknowledged the data.
If you need to send a large number of smaller requests (in other words, if you need to flush very frequently) or are in a high-latency network, you can significantly improve performance by creating and sending using multiple sender objects in parallel.
from questdb.ingress import Sender, TimestampNanos
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import datetime
def send_data(df):
conf_string = 'http::addr=localhost:9000;'
with Sender.from_conf(conf_string) as sender:
sender.dataframe(
df,
table_name='weather_sensor',
symbols=['id'],
at='timestamp')
dfs = [
pd.DataFrame({
'id': ['sensor1', 'sensor2'],
'temperature': [22.5, 24.7],
'humidity': [0.45, 0.47],
'timestamp': [
pd.Timestamp('2017-01-01T12:00:00'),
pd.Timestamp('2017-01-01T12:00:01')
]}),
pd.DataFrame({
'id': ['sensor3', 'sensor4'],
'temperature': [23.1, 25.3],
'humidity': [0.48, 0.50],
'timestamp': [
pd.Timestamp('2017-01-01T12:00:02'),
pd.Timestamp('2017-01-01T12:00:03')
]})
]
with ThreadPoolExecutor() as executor:
futures = [executor.submit(send_data, df)
for df in dfs]
for future in futures:
future.result()
For maxium performance you should also cache the sender objects and reuse them across multiple requests, since internally they maintain a connection pool.
Sender Lifetime Control¶
Instead of using a with Sender .. as sender:
block you can also manually
control the lifetime of the sender object.
from questdb.ingress import Sender
conf = 'http::addr=localhost:9000;'
sender = Sender.from_conf(conf)
sender.establish()
# ...
sender.close()
The establish
method is needs to be
called exactly once, but the close
method
is idempotent and can be called multiple times.
Table and Column Names¶
The client will validate table and column names while constructing the buffer.
Table names and column names must not be empty and must adhere to the following:
Table Names¶
Cannot contain the following characters: ?
, ,
, '
, "
, \
,
/
, :
, )
, (
, +
, *
, %
, ~
, carriage return
(\r
), newline (\n
), null character (\0
), and Unicode characters from
\u{0001}
to \u{000F}
and \u{007F}
.
Additionally, the Unicode character for zero-width no-break space (UTF-8 BOM,
\u{FEFF}
) is not allowed.
A dot (.
) is allowed except at the start or end of the name,
and cannot be consecutive (e.g., valid.name
is valid, but .invalid
,
invalid.
, and in..valid
are not).
Column Names¶
Cannot contain the following characters: ?
, .
, ,
, '
, "
,
\
, /
, :
, )
, (
, +
, -
, *
, %
, ~
,
carriage return (\r
), newline (\n
), null character (\0
),
and Unicode characters from \u{0001}
to \u{000F}
and \u{007F}
.
Like table names, the Unicode character for zero-width no-break space
(UTF-8 BOM, \u{FEFF}
) is not allowed.
Unlike table names, a dot (.
) is not allowed in column names at all.
Programmatic Construction¶
Sender Constructor¶
You can also specify the configuration parameters programmatically:
from questdb.ingress import Sender, Protocol
from datetime import timedelta
with Sender(Protocol.Tcp, 'localhost', 9009,
auto_flush=True,
auto_flush_interval=timedelta(seconds=10)) as sender:
...
See the Configuration section for a full list of configuration parameters: each configuration parameter can be passed as named arguments to the constructor.
Python type mappings:
Parameters that require strings take a
str
.Parameters that require numbers can also take an
int
.Millisecond durations can take an
int
or adatetime.timedelta
.Any
'on'
/'off'
/'unsafe_off'
parameters can also be specified as abool
.Paths can also be specified as a
pathlib.Path
.
Note
The constructor arguments have changed between 1.x and 2.x. If you are upgrading, take a look at the changelog.
Customising .from_conf()
and .from_env()
¶
If you want to further customise the behaviour of the .from_conf()
or
.from_env()
methods, you can pass additional parameters to these methods.
The parameters are the same as the ones for the Sender
constructor, as
documented above.
For example, here is a configuration string that is loaded from an environment variable and then customised to specify a 10 second auto-flush interval:
export QDB_CLIENT_CONF='http::addr=localhost:9000;'
from questdb.ingress import Sender, Protocol
from datetime import timedelta
with Sender.from_env(auto_flush_interval=timedelta(seconds=10)) as sender:
...
ILP/TCP or ILP/HTTP¶
The sender supports tcp
, tcps
, http
, and https
protocols.
You should prefer to use the new ILP/HTTP protocol instead of ILP/TCP in most cases as it provides better feedback on errors and transaction control.
ILP/HTTP is available from:
QuestDB 7.3.10 and later.
QuestDB Enterprise 1.2.7 and later.
Since TCP does not block for a response it is useful for high-throughput scenarios in higher latency networks or on older versions of QuestDB which do not support ILP/HTTP quite yet.
It should be noted that you can achieve equivalent or better performance to TCP with HTTP by using multiple sender objects in parallel.
Either way, you can easily switch between the two protocols by changing:
The
<protocol>
part of the configuration string.The port number (ILP/TCP default is 9009, ILP/HTTP default is 9000).
Any authentication parameters such as
username
,token
, et cetera.