176 lines
5.2 KiB
Python
176 lines
5.2 KiB
Python
from datetime import date, datetime, timedelta, timezone
|
|
from typing import Annotated
|
|
from fastapi import Depends, FastAPI, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
|
import jwt
|
|
from jwt.exceptions import InvalidTokenError
|
|
from pydantic import BaseModel
|
|
from dotenv import load_dotenv
|
|
import os
|
|
import logging
|
|
import requests
|
|
import json
|
|
import numpy as np
|
|
import holidays
|
|
import calendar
|
|
from odoo_api.client import OdooAPIClient
|
|
|
|
# to get a string like this run:
|
|
# openssl rand -hex 32
|
|
|
|
load_dotenv()
|
|
SECRET_KEY = os.getenv('SECRET_KEY')
|
|
ALGORITHM = os.getenv('ALGORITHM')
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv('ACCESS_TOKEN_EXPIRE_MINUTES'))
|
|
ODDO_URL = os.getenv('ODDO_URL')
|
|
DB_NAME = os.getenv('DB_NAME')
|
|
|
|
origins_str = os.getenv('ORIGINS', '')
|
|
origins = [origin.strip() for origin in origins_str.split(',') if origin.strip()]
|
|
|
|
class Token(BaseModel):
|
|
access_token: str
|
|
token_type: str
|
|
|
|
|
|
class User(BaseModel):
|
|
uid: str
|
|
full_name: str | None = None
|
|
email: str | None = None
|
|
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
|
logger = logging.getLogger("uvicorn")
|
|
OdooClient = OdooAPIClient(ODDO_URL, DB_NAME, SECRET_KEY, ALGORITHM)
|
|
|
|
app = FastAPI()
|
|
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
|
|
app = FastAPI()
|
|
|
|
logger.info(f"origins: {origins}")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=origins,
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
def create_access_token(data: dict, expires_delta: timedelta | None = None):
|
|
to_encode = data.copy()
|
|
if expires_delta:
|
|
expire = datetime.now(timezone.utc) + expires_delta
|
|
else:
|
|
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
|
|
|
|
to_encode.update({"exp": expire})
|
|
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
return encoded_jwt
|
|
|
|
|
|
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> User:
|
|
credentials_exception = HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Could not validate credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
uid = payload.get("uid")
|
|
email = payload.get("sub")
|
|
full_name = payload.get("full_name")
|
|
if uid is None or email is None:
|
|
raise credentials_exception
|
|
|
|
return User(uid=uid, email=email, full_name=full_name)
|
|
except InvalidTokenError:
|
|
raise credentials_exception
|
|
|
|
|
|
def odoo_api_login(username: str, password: str) -> User | None:
|
|
result = OdooClient.login(username, password)
|
|
if result.get('uid'):
|
|
return User(
|
|
uid=str(result.get('uid')),
|
|
full_name=result.get('name'),
|
|
email=result.get('username')
|
|
)
|
|
return None
|
|
|
|
def get_polish_holidays(year: int, month: int):
|
|
pl_holidays = holidays.Poland(years=year)
|
|
|
|
# Filtrowanie po miesiącu
|
|
holidays_list = [
|
|
{"date": date.strftime("%Y-%m-%d"), "name": name}
|
|
for date, name in sorted(pl_holidays.items())
|
|
if date.month == month
|
|
]
|
|
|
|
return holidays_list
|
|
|
|
def create_response(year: int, month: int, days: list) -> dict:
|
|
employee = days[0]["employee_id"][1]
|
|
days_in_month = calendar.monthrange(year, month)[1]
|
|
holidays = get_polish_holidays(year, month)
|
|
|
|
start = date(year, month, 1)
|
|
end = date(year, month, days_in_month) + timedelta(days=1)
|
|
|
|
# 4. Wyliczenie dni roboczych:
|
|
holiday_dates = [h['date'] for h in holidays]
|
|
working_days = int(np.busday_count(start.isoformat(), end.isoformat(), holidays=holiday_dates))
|
|
|
|
return {
|
|
"year": year,
|
|
"month": month,
|
|
"days_in_month": days_in_month,
|
|
"employee": employee,
|
|
"working_days": working_days,
|
|
"public_holidays": holidays,
|
|
"days": days
|
|
}
|
|
|
|
@app.post("/token")
|
|
async def login_for_access_token(
|
|
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token:
|
|
user = odoo_api_login(form_data.username, form_data.password)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
logger.info(f"User '{user.uid}' logged in successfully.")
|
|
logger.info(
|
|
f"User '{user.uid}, {user.full_name}, {user.email}' logged in successfully.")
|
|
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": user.email, "uid": user.uid, "full_name": user.full_name},
|
|
expires_delta=access_token_expires
|
|
)
|
|
return Token(access_token=access_token, token_type="bearer")
|
|
|
|
@app.get("/data/{year}/{month}")
|
|
async def read_own_items(
|
|
year: int,
|
|
month: int,
|
|
current_user: Annotated[User, Depends(get_current_user)],
|
|
):
|
|
logger.info(f"User '{current_user.full_name}' accessed data for {year}-{month}.")
|
|
|
|
data = OdooClient.get_hr_attendance_data(month=month, year=year)
|
|
if not data:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail="No attendance data found for the specified month and year."
|
|
)
|
|
return create_response(year, month, data)
|
|
|
|
|