Add EBS stuff

This commit is contained in:
PapaTutuWawa 2025-04-05 00:47:29 +02:00
parent 3744c343d4
commit 697c89bb4f
15 changed files with 442 additions and 5 deletions

View File

@ -1 +1,7 @@
Test
# Private Compute Stack (Pieces)
Pieces is a mostly API-compatible implementation of AWS services.
## EC2
A very small subset of EC2 functionality is implemented.

View File

@ -0,0 +1,90 @@
import uuid
import xml.etree.ElementTree as ET
from fastapi import Response, HTTPException
from fastapi.datastructures import QueryParams
from sqlmodel import select
import libvirt
from openec2.api.attach_volume import AttachVolumeResponse
from openec2.libvirt import LibvirtSingleton
from openec2.utils.libvirt import instance_to_libvirt_xml
from openec2.db.user import User
from openec2.config import OpenEC2Config
from openec2.db import DatabaseDep
from openec2.db.instance import Instance, EBSVolume
from openec2.utils.libvirt import ebs_volume_to_libvirt_xml
def attach_volume(
params: QueryParams,
config: OpenEC2Config,
db: DatabaseDep,
user: User,
):
device = params["Device"]
instance_id = params["InstanceId"]
volume_id = params["VolumeId"]
volume = db.exec(select(EBSVolume).where(EBSVolume.id == volume_id)).first()
if volume is None:
return
instance = db.exec(select(Instance).where(Instance.id == instance_id)).first()
if instance is None:
return
attached_volume_ids = [i.id for i in instance.ebs_volumes]
if volume_id in attached_volume_ids:
print("CANNOT ATTACH THE SAME VOLUME TO THE EC2 TWICE")
return
if not volume.multi_attach_enabled and volume.instances:
print("CANNOT ATTACH NON-MULTIATTACH again")
return
# Add the required data to libvirt
conn = LibvirtSingleton.of().connection
domain = conn.lookupByName(instance_id)
# Add the memory backing if required
running = domain.isActive()
if not instance.ebs_volumes:
if running:
raise HTTPException(
status_code=500,
detail="Instance is running",
)
# Update the instance
volume.instances.append(instance)
domain_xml = domain.XMLDesc()
domain_uuid = ET.fromstring(domain_xml).find("uuid").text
print(f"Updating XML for {instance.id} with {domain_uuid}")
instance_xml = instance_to_libvirt_xml(instance, config, domain_uuid)
print(instance_xml)
conn.defineXML(instance_xml)
else:
# Attach the device
volume.instances.append(instance)
domain.attachDeviceFlags(
ebs_volume_to_libvirt_xml(volume, config),
libvirt.VIR_DOMAIN_DEVICE_MODIFY_LIVE
if running
else libvirt.VIR_DOMAIN_DEVICE_MODIFY_CONFIG,
)
db.add(volume)
db.commit()
return Response(
AttachVolumeResponse(
requestId=uuid.uuid4().hex,
volumeId=volume_id,
instanceId=instance_id,
device=device,
status="attached",
).to_xml(),
media_type="application/xml",
)

View File

@ -0,0 +1,41 @@
import uuid
from fastapi import Response
from fastapi.datastructures import QueryParams
from openec2.config import OpenEC2Config
from openec2.db import DatabaseDep
from openec2.db.user import User
from openec2.db.instance import EBSVolume
from openec2.api.create_volume import CreateVolumeResponse
def create_volume(
params: QueryParams,
config: OpenEC2Config,
db: DatabaseDep,
user: User,
):
availabilityZone = params["AvailabilityZone"]
volume_id = f"vol-{uuid.uuid4().hex}"
volume = EBSVolume(
id=volume_id,
availability_zone=availabilityZone,
multi_attach_enabled=params.get("MultiAttachEnabled", "false") == "true",
owner_id=user.id,
)
volume.path(config).mkdir()
db.add(volume)
db.commit()
return Response(
CreateVolumeResponse(
requestId=uuid.uuid4().hex,
volumeId=volume_id,
availabilityZone=volume.availability_zone,
multiAttachEnabled=volume.multi_attach_enabled,
).to_xml(),
media_type="application/xml",
)

View File

@ -0,0 +1,35 @@
import uuid
from sqlmodel import select
from fastapi import Response
from openec2.db import DatabaseDep
from openec2.config import OpenEC2Config
from fastapi.datastructures import QueryParams
from openec2.db.user import User
from openec2.db.instance import EBSVolume
from openec2.api.describe_volumes import DescribeVolumesResponse, VolumeSet, Volume
def describe_volumes(
params: QueryParams,
config: OpenEC2Config,
db: DatabaseDep,
user: User,
):
volumes = db.exec(select(EBSVolume).where(EBSVolume.owner_id == user.id)).all()
return Response(
DescribeVolumesResponse(
requestId=uuid.uuid4().hex,
volumeSet=VolumeSet(
item=[
Volume(
volumeId=volume.id,
multiAttachEnabled=volume.multi_attach_enabled,
)
for volume in volumes
],
),
).to_xml(),
media_type="application/xml",
)

View File

@ -0,0 +1,61 @@
import uuid
import libvirt
from sqlmodel import select
from fastapi import Response
from fastapi.datastructures import QueryParams
from openec2.libvirt import LibvirtSingleton
from openec2.config import OpenEC2Config
from openec2.db import DatabaseDep
from openec2.db.user import User
from openec2.db.instance import Instance, EBSVolume
from openec2.api.detach_volume import DetachVolumeResponse
from openec2.utils.libvirt import ebs_volume_to_libvirt_xml
def detach_volume(
params: QueryParams,
config: OpenEC2Config,
db: DatabaseDep,
user: User,
):
instance_id = params["InstanceId"]
volume_id = params["VolumeId"]
# Find the instance
instance = db.exec(
select(Instance).where(Instance.id == instance_id, Instance.owner_id == user.id)
).first()
if instance is None:
return
# Find the volume
volume = db.exec(
select(EBSVolume).where(
EBSVolume.id == volume_id, EBSVolume.owner_id == user.id
)
).first()
if volume is None:
return
if instance_id not in [i.id for i in volume.instances]:
return
# Remove the volume from the instance
domain = LibvirtSingleton.of().connection.lookupByName(instance_id)
domain.detachDeviceFlags(
ebs_volume_to_libvirt_xml(volume, config),
libvirt.VIR_DOMAIN_DEVICE_MODIFY_LIVE
if domain.isActive()
else libvirt.VIR_DOMAIN_DEVICE_MODIFY_CONFIG,
)
return Response(
DetachVolumeResponse(
requestId=uuid.uuid4().hex,
volumeId=volume_id,
instanceId=instance_id,
status="detached",
).to_xml(),
media_type="application/xml",
)

View File

@ -98,9 +98,9 @@ def run_instances(
user: User,
):
image_id = params["ImageId"]
instance_type = params["InstanceType"]
instance_type_name = params["InstanceType"]
instance_type = config.instances.types.get(params["InstanceType"])
instance_type = config.instances.types.get(instance_type_name)
if instance_type is None:
raise Exception(f"Unknown instance type {params['InstanceType']}")
@ -153,6 +153,7 @@ def run_instances(
else None,
privateIPv4=private_ipv4,
interfaceMac=mac,
instanceType=instance_type_name,
owner_id=user.id,
)
db.add(instance)

View File

@ -61,6 +61,9 @@ def terminate_instances(
instance_disk = config.instances.location / instance_id
instance_disk.unlink()
for volume in instance.ebs_volumes:
volume.instances.remove(instance)
image_ids.add(instance.imageId)
remove_instance_dhcp_mapping(
instance.id, instance.interfaceMac, instance.privateIPv4, db

View File

@ -0,0 +1,17 @@
from pydantic_xml import BaseXmlModel, element
class AttachVolumeResponse(
BaseXmlModel,
tag="AttachVolumeResponse",
nsmap={"": "http://ec2.amazonaws.com/doc/2016-11-15/"},
):
requestId: str = element()
volumeId: str = element()
instanceId: str = element()
device: str = element()
status: str = element()

View File

@ -0,0 +1,15 @@
from pydantic_xml import BaseXmlModel, element
class CreateVolumeResponse(
BaseXmlModel,
tag="CreateVolumeResponse",
nsmap={"": "http://ec2.amazonaws.com/doc/2016-11-15/"},
):
requestId: str = element()
volumeId: str = element()
availabilityZone: str = element()
multiAttachEnabled: bool = element()

View File

@ -0,0 +1,20 @@
from pydantic_xml import BaseXmlModel, element
class Volume(BaseXmlModel):
volumeId: str = element()
multiAttachEnabled: bool = element()
class VolumeSet(BaseXmlModel):
item: list[Volume] = element(tag="item")
class DescribeVolumesResponse(
BaseXmlModel,
tag="DescribeVolumesResponse",
nsmap={"": "http://ec2.amazonaws.com/doc/2016-11-15/"},
):
requestId: str = element()
volumeSet: VolumeSet = element()

View File

@ -0,0 +1,15 @@
from pydantic_xml import BaseXmlModel, element
class DetachVolumeResponse(
BaseXmlModel,
tag="DetachVolumeResponse",
nsmap={"": "http://ec2.amazonaws.com/doc/2016-11-15/"},
):
requestId: str = element()
volumeId: str = element()
instanceId: str = element()
status: str = element()

View File

@ -12,7 +12,7 @@ class _OpenEC2InstanceType(BaseModel):
class _OpenEC2InstanceConfig(BaseModel):
location: Path
volumes: Path
types: dict[str, _OpenEC2InstanceType]
@ -45,6 +45,7 @@ def _get_config() -> _OpenEC2Config:
seed=Path("/home/alexander/openec2/seed"),
instances=_OpenEC2InstanceConfig(
location=Path("/home/alexander/openec2/instances"),
volumes=Path("/home/alexander/openec2/volumes"),
types={
"micro": _OpenEC2InstanceType(
memory=1024,

View File

@ -1,4 +1,35 @@
from sqlmodel import SQLModel, Field, JSON, Column
from pathlib import Path
from sqlmodel import SQLModel, Field, JSON, Column, Relationship
from openec2.config import OpenEC2Config
class EBSVolumeInstanceLink(SQLModel, table=True):
instance_id: str | None = Field(
default=None, foreign_key="instance.id", primary_key=True
)
ebs_volume_id: str | None = Field(
default=None, foreign_key="ebsvolume.id", primary_key=True
)
class EBSVolume(SQLModel, table=True):
id: str = Field(primary_key=True)
availability_zone: str
multi_attach_enabled: bool
instances: list["Instance"] = Relationship(
back_populates="ebs_volumes", link_model=EBSVolumeInstanceLink
)
owner_id: int = Field(foreign_key="user.id")
def path(self, config: OpenEC2Config) -> Path:
"""Compute the path of the volume on disk."""
return config.instances.volumes / self.id
class Instance(SQLModel, table=True):
@ -7,6 +38,8 @@ class Instance(SQLModel, table=True):
# Tags associated with the VM
tags: dict = Field(sa_column=Column(JSON), default={})
instanceType: str
# ImageID of the used AMI
imageId: str
@ -21,3 +54,8 @@ class Instance(SQLModel, table=True):
# The owner that creatd the resource.
owner_id: int = Field(foreign_key="user.id")
# Attached EBS volumes
ebs_volumes: list[EBSVolume] = Relationship(
back_populates="instances", link_model=EBSVolumeInstanceLink
)

View File

@ -18,6 +18,10 @@ from openec2.actions.terminate_instances import terminate_instances
from openec2.actions.start_instances import start_instances
from openec2.actions.stop_instances import stop_instances
from openec2.actions.deregister_image import deregister_image
from openec2.actions.create_volume import create_volume
from openec2.actions.attach_volume import attach_volume
from openec2.actions.describe_volumes import describe_volumes
from openec2.actions.detach_volume import detach_volume
from openec2.db.instance import Instance
app = FastAPI()
@ -83,6 +87,10 @@ def run_action(
"StartInstances": start_instances,
"StopInstances": stop_instances,
"DeregisterImage": deregister_image,
"CreateVolume": create_volume,
"AttachVolume": attach_volume,
"DescribeVolumes": describe_volumes,
"DetachVolume": detach_volume,
}[action](query_params, config, db, user)

View File

@ -0,0 +1,86 @@
from openec2.config import OpenEC2Config
from openec2.db.instance import EBSVolume, Instance
def ebs_volume_to_libvirt_xml(volume: EBSVolume, config: OpenEC2Config) -> str:
# TODO: Honour the attached device name
return f"""
<filesystem type='mount' accessmode='passthrough'>
<driver type='virtiofs' queue='1024' />
<source dir='{config.instances.volumes / volume.id}' />
<target dir='{volume.id}' />
</filesystem>
"""
def instance_to_libvirt_xml(
instance: Instance,
config: OpenEC2Config,
uuid: str | None = None,
) -> str:
instance_type = config.instances.types[instance.instanceType]
ami_path = config.instances.location / instance.id
memory_backing = (
"""
<memoryBacking>
<source type='memfd' />
<access mode='shared' />
</memoryBacking>
"""
if instance.ebs_volumes
else ""
)
volumes = "\n".join(
ebs_volume_to_libvirt_xml(volume, config) for volume in instance.ebs_volumes
)
uuid_element = f"<uuid>{uuid}</uuid>" if uuid is not None else ""
return f"""<domain type='kvm'>
{uuid_element}
<name>{instance.id}</name>
<memory unit='MiB'>{instance_type.memory}</memory>
{memory_backing}
<vcpu placement='static'>{int(instance_type.vcpu)}</vcpu>
<os>
<type arch='x86_64'>hvm</type>
<boot dev='hd' />
<smbios mode='sysinfo' />
</os>
<sysinfo type='smbios'>
<system>
<entry name='serial'>ds=nocloud;s=http://192.168.122.1:8000/private/cloudinit/{instance.id}/</entry>
</system>
</sysinfo>
<features>
<acpi />
<apic />
<vmport state='off' />
</features>
<clock offset='utc'>
<timer name='rtc' tickpolicy='catchup'/>
<timer name='pit' tickpolicy='delay'/>
<timer name='hpet' present='no'/>
</clock>
<pm>
<suspend-to-mem enabled='no'/>
<suspend-to-disk enabled='no'/>
</pm>
<devices>
{volumes}
<disk type='file' device='disk'>
<driver name='qemu' type='qcow2'/>
<source file='{ami_path}'/>
<target dev='vda' bus='virtio'/>
</disk>
<rng model="virtio">
<backend model="random">/dev/urandom</backend>
</rng>
<interface type="network">
<source network="default"/>
<mac address="{instance.interfaceMac}" />
<model type="virtio"/>
</interface>
</devices>
</domain>
"""