Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 97 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,101 @@
on: workflow_dispatch
name: Tests

on:
workflow_dispatch:
inputs:
super_ref:
description: 'brimdata/super branch, tag, or SHA to test against'
default: main
required: false
repository_dispatch:
types: [super-pr-merged]
schedule:
- cron: '5 8 * * *'
push:
branches: [main]
pull_request:

jobs:
test:
runs-on: ubuntu-latest
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v4
with:
repository: brimdata/super
ref: ${{ inputs.super_ref || 'main' }}
path: super
- uses: actions/setup-go@v5
with:
go-version-file: super/go.mod
cache-dependency-path: super/go.sum
- run: go build -o $GITHUB_WORKSPACE/bin/super ./cmd/super
working-directory: super
- name: Start super db serve (no auth)
run: |
$GITHUB_WORKSPACE/bin/super db -db $(mktemp -d) serve -l localhost:9867 &
for i in $(seq 1 30); do
curl -sf http://localhost:9867/status 2>/dev/null && break
sleep 1
done
- uses: actions/setup-python@v5
with:
python-version: '3.x'
- run: pip install '.[test]'
- run: pytest test_superdb.py
env:
SUPER_DB: http://localhost:9867
test-auth:
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v4
with:
repository: brimdata/super
ref: ${{ inputs.super_ref || 'main' }}
path: super
- uses: actions/setup-go@v5
with:
go-version-file: super/go.mod
cache-dependency-path: super/go.sum
- name: Build super and gentoken
run: |
go build -o $GITHUB_WORKSPACE/bin/super ./cmd/super
go build -o $GITHUB_WORKSPACE/bin/gentoken ./cmd/gentoken
working-directory: super
- name: Start super db serve (with auth)
run: |
$GITHUB_WORKSPACE/bin/super db -db $(mktemp -d) serve -l localhost:9867 \
-auth.enabled=t \
-auth.audience=a \
-auth.clientid=c \
-auth.domain=d \
-auth.jwkspath=$GITHUB_WORKSPACE/super/service/testdata/auth-public-jwks.json &
for i in $(seq 1 30); do
curl -sf http://localhost:9867/status 2>/dev/null && break
sleep 1
done
- name: Store auth token
run: |
token=$($GITHUB_WORKSPACE/bin/gentoken \
-audience a -domain d -keyid testkey \
-privatekeyfile $GITHUB_WORKSPACE/super/service/testdata/auth-private-key \
-tenantid t -userid u)
$GITHUB_WORKSPACE/bin/super db auth store -access "$token" -db http://localhost:9867
- uses: actions/setup-python@v5
with:
python-version: '3.x'
- run: pip install '.[test]'
- run: pytest test_superdb.py test_auth.py
env:
SUPER_DB: http://localhost:9867
SUPER_DB_AUTH: '1'
notify:
runs-on: ubuntu-24.04
needs: [test, test-auth]
if: failure() && github.ref_name == 'main'
steps:
- run: echo "placeholder"
- run: |
curl -s -X POST ${{ secrets.SLACK_WEBHOOK_BRIMLABS_TEST }} \
-H 'Content-type: application/json' \
--data '{"username":"superdb-python","text":"Python client tests failed: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"}'
27 changes: 25 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,25 @@
# superdb-python
A Python module for interacting with a persistent SuperDB database
# `superdb` Python Package

Visit <https://superdb.org/dev/libraries/python.html> for installation
instructions and example usage.

## Running the tests

Create and activate a virtual environment, install the package with its test
dependencies, and start a local SuperDB service:

```
python3 -m venv .venv
source .venv/bin/activate
pip3 install -e '.[test]'
super db -db $(mktemp -d) serve
```

Then in another shell (with the virtual environment activated):

```
source .venv/bin/activate
pytest
```

Tests are skipped automatically if the SuperDB service is not reachable.
22 changes: 22 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[build-system]
requires = ["setuptools", "setuptools-scm"]
build-backend = "setuptools.build_meta"

[project]
name = "superdb"
dynamic = ["version"]
dependencies = ["pyarrow", "requests"]
requires-python = ">=3.8"

[tool.setuptools]
py-modules = ["superdb"]

[project.optional-dependencies]
test = ["pytest"]

[tool.pytest.ini_options]
addopts = "-rs"

[tool.setuptools_scm]
fallback_version = "0+unknown"
version_scheme = "post-release"
162 changes: 162 additions & 0 deletions superdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import getpass
import json
import os
import os.path
import urllib.parse

import pyarrow as pa
import pyarrow.ipc
import requests


class Client():
def __init__(self,
base_url=os.environ.get('SUPER_DB', 'http://localhost:9867'),
config_dir=os.path.expanduser('~/.super')):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
self.session.headers.update({'Accept': 'application/vnd.apache.arrow.stream'})
token = self.__get_auth_token(config_dir)
if token is not None:
self.session.headers.update({'Authorization': 'Bearer ' + token})

def __get_auth_token(self, config_dir):
creds_path = os.path.join(config_dir, 'credentials.json')
try:
with open(creds_path) as f:
data = f.read()
except FileNotFoundError:
return None
creds = json.loads(data)
if self.base_url in creds['services']:
return creds['services'][self.base_url]['access']
return None

def create_pool(self, name, layout={'order': 'desc', 'keys': [['ts']]},
thresh=0):
r = self.session.post(self.base_url + '/pool', json={
'name': name,
'layout': layout,
'thresh': thresh,
})
self.__raise_for_status(r)

def load(self, pool_name_or_id, data, branch_name='main',
commit_author=getpass.getuser(), commit_body='',
mime_type=None):
pool = urllib.parse.quote(pool_name_or_id, safe='')
branch = urllib.parse.quote(branch_name, safe='')
url = self.base_url + '/pool/' + pool + '/branch/' + branch
commit_message = {'author': commit_author, 'body': commit_body}
headers = {'SuperDB-Commit': json.dumps(commit_message)}
if mime_type is not None:
headers['Content-Type'] = mime_type
r = self.session.post(url, headers=headers, data=data)
self.__raise_for_status(r)

def delete_pool(self, pool_name_or_id):
pool = urllib.parse.quote(pool_name_or_id, safe='')
r = self.session.delete(self.base_url + '/pool/' + pool)
self.__raise_for_status(r)

def query(self, query, safe=True):
if safe:
# Pre-flight: verify all top-level values are records of a single
# type. Arrow requires top-level records and silently truncates on
# type changes, so we detect both problems before issuing the real
# query.
safety_r = self.query_raw(
query + ' | union(typeof(this)) by kind(this)',
headers={'Accept': 'application/x-ndjson'},
)
rows = [
json.loads(line)
for line in safety_r.iter_lines(decode_unicode=True)
if line
]
if rows:
if any(row['kind'] != 'record' for row in rows):
kinds = sorted({row['kind'] for row in rows})
raise NonRecordError(
f"Query result contains non-record values "
f"(kind: {', '.join(repr(k) for k in kinds)}). "
f"Arrow requires top-level records.",
kinds,
)
type_count = len(rows[0]['union'])
if type_count > 1:
raise MixedTypesError(
f'Query result contains {type_count} distinct types; results '
f'would be silently truncated. Use \'| blend\' to merge types '
f'into one, or pass safe=False to skip this check and accept '
f'partial results.',
type_count,
)
r = self.query_raw(query)
try:
reader = pa.ipc.open_stream(r.raw)
except pa.lib.ArrowInvalid as e:
# An empty response body (no schema) means either the pool has no
# data or the data contains a type the Arrow encoder can't handle
# (e.g. an empty record). Both cases are indistinguishable at the
# HTTP level when streaming, so both are silently treated as an
# empty result. Any other ArrowInvalid (wrong format, mid-stream
# corruption, etc.) is re-raised.
if 'null or length 0' in str(e):
return
raise
for batch in reader:
yield from batch.to_pylist(maps_as_pydicts='strict')

def query_raw(self, query, headers=None):
r = self.session.post(self.base_url + '/query', headers=headers,
json={'query': query}, stream=True)
self.__raise_for_status(r)
r.raw.decode_content = True
return r

@staticmethod
def __raise_for_status(response):
if response.status_code >= 400:
try:
error = response.json()['error']
except Exception:
response.raise_for_status()
else:
raise RequestError(error, response)


class RequestError(Exception):
"""Raised by Client methods when an HTTP request fails."""
def __init__(self, message, response):
super(RequestError, self).__init__(message)
self.response = response


class MixedTypesError(Exception):
"""Raised by query() when the result contains more than one distinct type."""
def __init__(self, message, type_count):
super().__init__(message)
self.type_count = type_count


class NonRecordError(Exception):
"""Raised by query() when the result contains non-record top-level values."""
def __init__(self, message, kinds):
super().__init__(message)
self.kinds = kinds


if __name__ == '__main__':
import argparse
import pprint

parser = argparse.ArgumentParser(
description='Query default SuperDB service and print results.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('query')
args = parser.parse_args()

c = Client()
for record in c.query(args.query):
pprint.pprint(record)
47 changes: 47 additions & 0 deletions test_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""
Auth integration tests for the SuperDB Python client.
These tests require a SuperDB service running with authentication enabled
and valid credentials stored in ~/.super/credentials.json. In CI they
are run automatically by the test-auth GitHub Actions job. When run
manually with plain pytest they are skipped unless the SUPER_DB_AUTH
environment variable is set.
"""

import os
import uuid

import pytest
import requests

from superdb import Client, RequestError

if not os.environ.get('SUPER_DB_AUTH'):
pytest.skip('auth not configured (SUPER_DB_AUTH not set)', allow_module_level=True)

_BASE_URL = os.environ.get('SUPER_DB', 'http://localhost:9867').rstrip('/')
try:
requests.get(_BASE_URL + '/status', timeout=2)
except requests.exceptions.ConnectionError:
pytest.skip(
f'SuperDB service not reachable at {_BASE_URL}',
allow_module_level=True,
)


def test_authenticated_client_can_query():
client = Client()
name = 'test_auth_' + uuid.uuid4().hex[:8]
client.create_pool(name)
try:
client.load(name, b'{a: 1}', mime_type='application/x-sup')
assert list(client.query(f'from {name}')) == [{'a': 1}]
finally:
client.delete_pool(name)


def test_unauthenticated_client_raises_request_error():
# A client with no credentials (config_dir='') should be rejected by an
# auth-enabled service on any request.
with pytest.raises(RequestError):
Client(config_dir='').create_pool('x')
Loading
Loading