Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 4 additions & 43 deletions folksonomy/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
User,
ValueCount,
)
from .utils import get_current_user, check_owner_user
from .knowledge_panels import router as knowledge_panels_router


description = """
Expand Down Expand Up @@ -71,6 +73,8 @@ async def app_lifespan(app: FastAPI):
lifespan=app_lifespan,
)

app.include_router(knowledge_panels_router)

# Allow anyone to call the API from their own apps
app.add_middleware(
CORSMiddleware,
Expand Down Expand Up @@ -117,56 +121,13 @@ async def hello():
return {"message": "Hello folksonomy World! Tip: open /docs for documentation"}


async def get_current_user(token: str = Depends(oauth2_scheme)):
"""
Get current user and check token validity if present
"""
if token and "__U" in token:
cur = db.cursor()
await cur.execute(
"UPDATE auth SET last_use = current_timestamp AT TIME ZONE 'GMT' WHERE token = %s",
(token,),
)
if cur.rowcount == 1:
return User(user_id=token.split("__U", 1)[0])
else:
return User(user_id=None)


def sanitize_data(k, v):
"""Some sanitization of data"""
k = k.strip()
v = v.strip() if v else v
return k, v


def check_owner_user(user: User, owner, allow_anonymous=False):
"""
Check authentication depending on current user and 'owner' of the data
"""
user = user.user_id if user is not None else None
if user is None and not allow_anonymous:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required",
headers={"WWW-Authenticate": "Bearer"},
)
if owner != "":
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required for '%s'" % owner,
headers={"WWW-Authenticate": "Bearer"},
)
if owner != user:
raise HTTPException(
status_code=422,
detail="owner should be '%s' or '' for public, but '%s' is authenticated"
% (owner, user),
)
return


def get_auth_server(request: Request):
"""
Get auth server URL from request
Expand Down
113 changes: 113 additions & 0 deletions folksonomy/knowledge_panels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#! /usr/bin/python3

from fastapi import APIRouter, Response, Depends, HTTPException, Query
from typing import Optional
from . import db
from .utils import get_current_user, check_owner_user
from .models import (
User,
ProductKnowledgePanels,
TableElement,
TableColumn,
Element,
Panel,
TitleElement,
)

router = APIRouter()


@router.get(
"/product/{product}/knowledge-panels", response_model=ProductKnowledgePanels
)
async def product_barcode_knowledge_panels(
response: Response,
product: str,
k: Optional[str] = Query(None, description="Optional key to filter the results"),
owner="",
user: User = Depends(get_current_user),
):
"""
Return product information in a Knowledge Panel format.
If 'k' is provided, only that key is fetched.
If none is provided, all the keys are returned.
"""
check_owner_user(user, owner, allow_anonymous=True)

query = """
SELECT product, k, v, owner, version, editor, last_edit, comment
FROM folksonomy
WHERE product = %s
"""
params = [product]
if k:
query += " AND k = %s"
params.append(k)
query += " ORDER BY k;"

cur, timing = await db.db_exec(query, tuple(params))
rows = await cur.fetchall()

if not rows:
raise HTTPException(
status_code=404,
detail="Could not find product or key",
)

panels_by_key = {}
for row in rows:
product_value, k, v, _, _, _, _, _ = row

if k not in panels_by_key:
panels_by_key[k] = {"Barcode": product_value, "Key": k, "Value": v}
else:
data = panels_by_key[k]
if not data.get("Barcode") and product_value:
data["Barcode"] = product_value
if not data.get("Value") and v:
data["Value"] = v

panels = {}
for k, data in panels_by_key.items():
rows_html = ""
for property, value in data.items():
if value is None:
continue

# Links for Key and Value property
if property == "Key":
value_cell = (
f'<a href="world.openfoodfacts.org/property/{value}">{value}</a> '
f'<a href="wiki.openfoodfacts.org/Folksonomy/Property/{value}">&#8505;</a>'
)
elif property == "Value":
value_cell = f'<a href="world.openfoodfacts.org/property/{data["Key"]}/value/{value}">{value}</a>'
else:
value_cell = value

row_html = f"<tr><td>{property}</td><td>{value_cell}</td></tr>"
rows_html += row_html

table_html = f"<table>{rows_html}</table>"

table_element = TableElement(
id=k,
title=f"Folksonomy Data for '{k}'",
rows=table_html,
columns=[
TableColumn(type="text", text="Property"),
TableColumn(type="text", text="Value"),
],
)

element = Element(type="table", table_element=table_element)

panel = Panel(
title_element=TitleElement(title=f"Folksonomy Data for '{k}'", name=k),
elements=[element],
)
panels[k] = panel

response_obj = ProductKnowledgePanels(knowledge_panels=panels)
response.headers["x-pg-timing"] = timing
return response_obj
60 changes: 59 additions & 1 deletion folksonomy/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re
from datetime import datetime
from typing import Optional
from typing import Dict, Optional, List

from pydantic import BaseModel, field_validator

Expand Down Expand Up @@ -90,3 +90,61 @@ class ValueCount(BaseModel):

class PingResponse(BaseModel):
ping: str


class TitleElement(BaseModel):
"""
The title of a knowledge panel.
"""

name: str # Short name of this panel, not including any actual values
title: str # Human-readable title shown in the panel


class TableColumn(BaseModel):
"""
A column in a TableElement.
"""

type: str # Type of value for the values in column
text: str # Name of the column


class TableElement(BaseModel):
"""
Element to display a table in a knowledge panel.
"""

id: str # An id for the table
title: str # Title of the column
rows: str # Values to fill the rows
columns: List[TableColumn]


class Element(BaseModel):
"""
Each element object contains one specific element object such as a text element or an image element. For knowledge panels.
"""

type: str # The type of the included element object. The type also indicates which field contains the included element object. e.g. if the type is "text", the included element object will be in the "text_element" field.
table_element: TableElement


class Panel(BaseModel):
"""
Each knowledge panel contains an optional title and an optional array of elements.
"""

title_element: TitleElement
elements: List[Element]


class ProductKnowledgePanels(BaseModel):
"""
The knowledge panels object is a dictionary of individual panel objects.
Each key of the dictionary is the id of the panel, and the value is the panel object.

Apps typically display a number of root panels with known panel ids (e.g. health_card and environment_card). Panels can reference other panels and display them as sub-panels.
"""

knowledge_panels: Dict[str, Panel]
49 changes: 49 additions & 0 deletions folksonomy/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from fastapi import Depends, HTTPException, status
from . import db
from fastapi.security import OAuth2PasswordBearer
from .models import User

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth", auto_error=False)


async def get_current_user(token: str = Depends(oauth2_scheme)):
"""
Get current user and check token validity if present
"""
if token and "__U" in token:
cur = db.cursor()
await cur.execute(
"UPDATE auth SET last_use = current_timestamp AT TIME ZONE 'GMT' WHERE token = %s",
(token,),
)
if cur.rowcount == 1:
return User(user_id=token.split("__U", 1)[0])
else:
return User(user_id=None)


def check_owner_user(user: User, owner, allow_anonymous=False):
"""
Check authentication depending on current user and 'owner' of the data
"""
user = user.user_id if user is not None else None
if user is None and not allow_anonymous:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required",
headers={"WWW-Authenticate": "Bearer"},
)
if owner != "":
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required for '%s'" % owner,
headers={"WWW-Authenticate": "Bearer"},
)
if owner != user:
raise HTTPException(
status_code=422,
detail="owner should be '%s' or '' for public, but '%s' is authenticated"
% (owner, user),
)
return
32 changes: 32 additions & 0 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,38 @@ async def test_post_invalid(with_sample, client, auth_tokens):
)


def test_product_knowledge_panels(with_sample):
with TestClient(app) as client:
headers = {"Authorization": "Bearer foo__Utest-token"}
# all panels, no k to filter
response = client.get(f"/product/{BARCODE_1}/knowledge-panels", headers=headers)
assert response.status_code == 200
data = response.json()
assert "knowledge_panels" in data
# filtered for only one k
response = client.get(
f"/product/{BARCODE_1}/knowledge-panels?k=color", headers=headers
)
assert response.status_code == 200
data = response.json()
panels = data["knowledge_panels"]
assert len(panels) == 1
assert "color" in panels
assert "x-pg-timing" in response.headers
# non-existing product
response = client.get(
"/product/0000000000000/knowledge-panels", headers=headers
)
assert response.status_code == 404
data = response.json()
assert "Could not find product or key" in data["detail"]
# existing product but non-existing key
response = client.get(
f"/product/{BARCODE_1}/knowledge-panels?k=nonexistent", headers=headers
)
assert response.status_code == 404


@pytest.mark.asyncio
async def test_post(with_sample, client, auth_tokens):
headers = {"Authorization": "Bearer foo__Utest-token"}
Expand Down
Loading