|
- import base64
- from sqlalchemy.orm import Session
- from schemas.maintenancejob import CreateMaintenanceJob
- from db.models.maintenancejob import MaintenanceJob
- from schemas.carpart import CreateCarPart
- from db.models.carpart import CarPart
- from db.repository.user import get_car_driver
-
-
- def create_new_maintenancejob(
- maintenancejob: CreateMaintenanceJob, db: Session
- ):
- vehicledriver = get_car_driver(maintenancejob.VehicleID, db)
- if (vehicledriver != False):
- vehicledriver = vehicledriver.Id
- else:
- return "nodriver"
- maintenancejob_object = MaintenanceJob(
- Description=maintenancejob.Description,
- VehicleID=maintenancejob.VehicleID,
- Date=maintenancejob.Date,
- VehicleDriverId=get_car_driver(maintenancejob.VehicleID, db).Id,
- Status="Requested",
- )
- print("OBJECT CREATED")
- db.add(maintenancejob_object)
- db.commit()
- db.refresh(maintenancejob_object)
- return maintenancejob_object
-
-
- def create_car_part(car_part: CreateCarPart, db: Session):
- car_part_object = CarPart(
- ParentId=car_part.ParentId,
- Name=car_part.Name,
- Number=car_part.Number,
- Condition=car_part.Condition,
- ImageURL=car_part.image.file.read(),
- Cost=car_part.Cost,
- )
- print("OBJECT CREATED")
- db.add(car_part_object)
- db.commit()
- print("OBJECT SAVED")
- db.refresh(car_part_object)
- print("OBJECT REFRESHED")
- res_obj = car_part_object.__dict__
- res_obj["image"] = base64.b64encode(car_part_object.ImageURL).decode("ascii")
- return res_obj
-
-
- def calculate_total_cost(car_parts: CarPart):
- total_cost = 0
- for part in car_parts:
- total_cost += part.Cost
- return total_cost
-
-
- def get_all_maintenance_jobs(db: Session):
- maintenancejobs = db.query(MaintenanceJob).all()
- print("DB Access complete")
- result = []
- for job in maintenancejobs:
- job_dict = job.__dict__
- print(job_dict)
- job_dict["CarPartsList"] = [part.__dict__ for part in job.CarParts]
- print(len(job_dict["CarPartsList"]))
- for part in job_dict["CarPartsList"]:
- if (part["ImageURL"] is None) or (part["ImageURL"] == ""):
- part["image"] = ""
- else:
- part["image"] = base64.b64encode(part["ImageURL"]).decode("ascii")
- job_dict["TotalCost"] = calculate_total_cost(job.CarParts)
- job_dict["AssignedTo"] = job.CreatedBy.__dict__
- job_dict["Vehicle"] = job.Vehicle.__dict__
- result.append(job_dict)
- print("Returning...")
- return maintenancejobs
-
-
- def get_maintenance_job(maintenancejob_id: int, db: Session):
- maintenancejob = (
- db.query(MaintenanceJob).filter(MaintenanceJob.Id == maintenancejob_id).first()
- )
- if maintenancejob is None:
- return None
- res = maintenancejob.__dict__
- res["CarPartsList"] = [part.__dict__ for part in maintenancejob.CarParts]
- for part in maintenancejob.CarPartsList:
- part["image"] = base64.b64encode(part["ImageURL"]).decode("ascii")
-
- # print(type(result.CarPartsList))
- res["TotalCost"] = calculate_total_cost(maintenancejob.CarParts)
- # print(result.TotalCost)
- if maintenancejob.Status == "Complete":
- res["FinishedBy"] = maintenancejob.FinishedBy.__dict__
- res["Vehicle"] = maintenancejob.Vehicle.__dict__
- print("DB Access complete")
- return maintenancejob
-
-
- def change_maintenance_status(maintenancejob_id: int, status: str, worker_id: int, db: Session):
- maintenancejob = (
- db.query(MaintenanceJob).filter(MaintenanceJob.Id == maintenancejob_id).first()
- )
- if maintenancejob is None:
- return None
- if (status == "Complete"):
- maintenancejob.MaintenanceWorker = worker_id
- maintenancejob.Status = status
- db.commit()
- db.refresh(maintenancejob)
- return maintenancejob
- else:
- return None
-
-
|