#[cfg(feature = "k8")] #[cfg(not(feature = "k8_stream"))] mod integration_tests { use std::collections::HashMap; use std::time::Duration; use anyhow::Result; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use tracing::debug; use fluvio_future::test_async; use fluvio_future::timer::sleep; use k8_client::http::status::StatusCode; use k8_client::K8Client; use k8_metadata_client::MetadataClient; use k8_types::core::service::{LoadBalancerType, ServicePort}; use k8_types::core::service::{LoadBalancerIngress, ServiceSpec}; use k8_types::{InputK8Obj, InputObjectMeta, Spec, MetaStatus}; const SPU_DEFAULT_NAME: &str = "spu"; const DELAY: Duration = Duration::from_millis(100); fn create_client() -> K8Client { K8Client::try_default().expect("cluster not initialized") } fn new_service() -> InputK8Obj { let rng = thread_rng(); let rname: String = rng .sample_iter(&Alphanumeric) .map(char::from) .take(5) .collect(); let name = format!("test{}", rname); let mut labels = HashMap::new(); labels.insert("app".to_owned(), SPU_DEFAULT_NAME.to_owned()); let mut selector = HashMap::new(); selector.insert("app".to_owned(), SPU_DEFAULT_NAME.to_owned()); let service_spec = ServiceSpec { ports: vec![ServicePort { port: 9092, ..Default::default() }], selector: Some(selector), r#type: Some(LoadBalancerType::LoadBalancer), ..Default::default() }; let new_item: InputK8Obj = InputK8Obj { api_version: ServiceSpec::api_version(), kind: ServiceSpec::kind(), metadata: InputObjectMeta { name: name.to_lowercase(), labels, namespace: "default".to_owned(), ..Default::default() }, spec: service_spec, ..Default::default() }; new_item } #[test_async] async fn test_object_conflict() -> Result<()> { let new_item = new_service(); debug!("creating new service: {:#?}", &new_item); let client = create_client(); let created_item = client .create_item::(new_item) .await .expect("service should be created"); sleep(DELAY).await; let item = client .retrieve_item::(&created_item.metadata) .await .expect("retrieval") .expect("service should exist"); let initial_version = item.metadata.resource_version.clone(); debug!("client resource_version: {}", initial_version); // update status let mut new_status = item.status.clone(); let ingress = LoadBalancerIngress { ip: Some("0.0.0.0".to_string()), ip_mode: Some("VIP".to_string()), ..Default::default() }; new_status.load_balancer.ingress.push(ingress); let mut new_status2 = item.status.clone(); let ingress = LoadBalancerIngress { ip: Some("1.1.1.1".to_string()), ip_mode: Some("Proxy".to_string()), ..Default::default() }; new_status2.load_balancer.ingress.push(ingress); // manually set ip external ip let status_update1 = item.as_status_update(new_status); let status_update2 = item.as_status_update(new_status2); let updated_item = client.update_status(&status_update1).await.expect("update"); let updated_version = updated_item.metadata.resource_version.clone(); debug!("updated resource_version: {}", updated_version); assert_ne!(updated_version, initial_version); // do another update status which leads to conflict. let err = client .update_status(&status_update2) .await .expect_err("update"); if let Some(status) = err.downcast_ref::() { assert_eq!(status.code, Some(StatusCode::CONFLICT.as_u16())) } else { panic!("expecting conflict error"); } // clean up let input_metadata: InputObjectMeta = updated_item.metadata.into(); client .delete_item::(&input_metadata) .await .expect("delete should work"); Ok(()) } }