176 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			176 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from datetime import date, datetime, timedelta, timezone
 | |
| from typing import Annotated
 | |
| from fastapi import Depends, FastAPI, HTTPException, status
 | |
| from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
 | |
| import jwt
 | |
| from jwt.exceptions import InvalidTokenError
 | |
| from pydantic import BaseModel
 | |
| from dotenv import load_dotenv
 | |
| import os
 | |
| import logging
 | |
| import requests
 | |
| import json
 | |
| import numpy as np
 | |
| import holidays
 | |
| import calendar
 | |
| from odoo_api.client import OdooAPIClient
 | |
| 
 | |
| # to get a string like this run:
 | |
| # openssl rand -hex 32
 | |
| 
 | |
| load_dotenv()
 | |
| SECRET_KEY = os.getenv('SECRET_KEY')
 | |
| ALGORITHM = os.getenv('ALGORITHM')
 | |
| ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv('ACCESS_TOKEN_EXPIRE_MINUTES'))
 | |
| ODDO_URL = os.getenv('ODDO_URL')
 | |
| DB_NAME = os.getenv('DB_NAME')
 | |
| 
 | |
| origins_str = os.getenv('ORIGINS', '')
 | |
| origins = [origin.strip() for origin in origins_str.split(',') if origin.strip()]
 | |
| 
 | |
| class Token(BaseModel):
 | |
|     access_token: str
 | |
|     token_type: str
 | |
| 
 | |
| 
 | |
| class User(BaseModel):
 | |
|     uid: str
 | |
|     full_name: str | None = None
 | |
|     email: str | None = None
 | |
| 
 | |
| 
 | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
 | |
| logger = logging.getLogger("uvicorn")
 | |
| OdooClient = OdooAPIClient(ODDO_URL, DB_NAME, SECRET_KEY, ALGORITHM)
 | |
| 
 | |
| app = FastAPI()
 | |
| 
 | |
| from fastapi.middleware.cors import CORSMiddleware
 | |
| 
 | |
| app = FastAPI()
 | |
| 
 | |
| logger.info(f"origins: {origins}")
 | |
| 
 | |
| app.add_middleware(
 | |
|     CORSMiddleware,
 | |
|     allow_origins=origins,
 | |
|     allow_credentials=True,
 | |
|     allow_methods=["*"],
 | |
|     allow_headers=["*"],
 | |
| )
 | |
| 
 | |
| def create_access_token(data: dict, expires_delta: timedelta | None = None):
 | |
|     to_encode = data.copy()
 | |
|     if expires_delta:
 | |
|         expire = datetime.now(timezone.utc) + expires_delta
 | |
|     else:
 | |
|         expire = datetime.now(timezone.utc) + timedelta(minutes=15)
 | |
| 
 | |
|     to_encode.update({"exp": expire})
 | |
|     encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
 | |
| 
 | |
|     return encoded_jwt
 | |
| 
 | |
| 
 | |
| async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> User:
 | |
|     credentials_exception = HTTPException(
 | |
|         status_code=status.HTTP_401_UNAUTHORIZED,
 | |
|         detail="Could not validate credentials",
 | |
|         headers={"WWW-Authenticate": "Bearer"},
 | |
|     )
 | |
|     try:
 | |
|         payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
 | |
|         uid = payload.get("uid")
 | |
|         email = payload.get("sub")
 | |
|         full_name = payload.get("full_name")
 | |
|         if uid is None or email is None:
 | |
|             raise credentials_exception
 | |
|         
 | |
|         return User(uid=uid, email=email, full_name=full_name)
 | |
|     except InvalidTokenError:
 | |
|         raise credentials_exception
 | |
| 
 | |
| 
 | |
| def odoo_api_login(username: str, password: str) -> User | None:
 | |
|     result = OdooClient.login(username, password)
 | |
|     if result.get('uid'):
 | |
|         return User(
 | |
|             uid=str(result.get('uid')),
 | |
|             full_name=result.get('name'),
 | |
|             email=result.get('username')
 | |
|         )
 | |
|     return None
 | |
| 
 | |
| def get_polish_holidays(year: int, month: int):
 | |
|     pl_holidays = holidays.Poland(years=year)
 | |
| 
 | |
|     # Filtrowanie po miesiącu
 | |
|     holidays_list = [
 | |
|         {"date": date.strftime("%Y-%m-%d"), "name": name}
 | |
|         for date, name in sorted(pl_holidays.items())
 | |
|         if date.month == month
 | |
|     ]
 | |
| 
 | |
|     return holidays_list
 | |
| 
 | |
| def create_response(year: int, month: int, days: list) -> dict:
 | |
|     employee = days[0]["employee_id"][1]
 | |
|     days_in_month = calendar.monthrange(year, month)[1]
 | |
|     holidays = get_polish_holidays(year, month)
 | |
| 
 | |
|     start = date(year, month, 1)
 | |
|     end = date(year, month, days_in_month) + timedelta(days=1)
 | |
| 
 | |
|     # 4. Wyliczenie dni roboczych:
 | |
|     holiday_dates = [h['date'] for h in holidays]
 | |
|     working_days = int(np.busday_count(start.isoformat(), end.isoformat(), holidays=holiday_dates))
 | |
| 
 | |
|     return {
 | |
|         "year": year,
 | |
|         "month": month,
 | |
|         "days_in_month": days_in_month,
 | |
|         "employee": employee,
 | |
|         "working_days": working_days,
 | |
|         "public_holidays": holidays,
 | |
|         "days": days
 | |
|     }
 | |
| 
 | |
| @app.post("/token")
 | |
| async def login_for_access_token(
 | |
|         form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token:
 | |
|     user = odoo_api_login(form_data.username, form_data.password)
 | |
|     if not user:
 | |
|         raise HTTPException(
 | |
|             status_code=status.HTTP_401_UNAUTHORIZED,
 | |
|             detail="Incorrect username or password",
 | |
|             headers={"WWW-Authenticate": "Bearer"},
 | |
|         )
 | |
|     logger.info(f"User '{user.uid}' logged in successfully.")
 | |
|     logger.info(
 | |
|         f"User '{user.uid}, {user.full_name}, {user.email}' logged in successfully.")
 | |
| 
 | |
|     access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
 | |
|     access_token = create_access_token(
 | |
|         data={"sub": user.email, "uid": user.uid, "full_name": user.full_name},
 | |
|         expires_delta=access_token_expires
 | |
|     )
 | |
|     return Token(access_token=access_token, token_type="bearer")
 | |
| 
 | |
| @app.get("/data/{year}/{month}")
 | |
| async def read_own_items(
 | |
|     year: int,
 | |
|     month: int,
 | |
|     current_user: Annotated[User, Depends(get_current_user)],
 | |
| ):
 | |
|     logger.info(f"User '{current_user.full_name}' accessed data for {year}-{month}.")
 | |
| 
 | |
|     data = OdooClient.get_hr_attendance_data(month=month, year=year)
 | |
|     if not data:
 | |
|         raise HTTPException(
 | |
|             status_code=404,
 | |
|             detail="No attendance data found for the specified month and year."
 | |
|         )
 | |
|     return create_response(year, month, data)
 | |
|     
 | |
| 
 |