南通建设中标查询网站,餐饮营销引流都有什么方法,seo文章代写平台,互联网营销宣传背景今天的话题会偏代码技巧一些#xff0c;对于以前没有接触过代码的朋友或者接触代码开发经验较少的朋友会有些吃力。上篇文章介绍了如何广视角的生成相对稳定的视频。昨天的实现相对简单#xff0c;主要用的是UI界面来做生成。但是生成的效果其实也显而易见#xff0c;不…背景今天的话题会偏代码技巧一些对于以前没有接触过代码的朋友或者接触代码开发经验较少的朋友会有些吃力。上篇文章介绍了如何广视角的生成相对稳定的视频。昨天的实现相对简单主要用的是UI界面来做生成。但是生成的效果其实也显而易见不算太好。人物连贯性和动作连贯性是不够的。原因如下1.stablediffusion webui的batch 的image2image还无法真正做到image2image2.控制其实是通过固定的文本prompt多个controlnet来控制然后如果希望画面能够稳定其实image的控制信息和每张图用相对有差异的prompt生成的图片质量连贯性和稳定性会更好image2image控制整体的风格和内容controlnet控制细节prompt可以控制一些内容差异。这篇文章不会跟大家分享稳定图生成的具体细节。而是跟大家分享更重要的如何用代码来实现webui的功能如何用代码方式搭建更可控更高效的图生成链路。内容1.用代码搭建stablediffusioncontrolnet生产流程2.multi-control net生产流程搭建3.diffuser没有的功能如何自己实现加入用代码搭建stablediffusioncontrolnet生产流程安装diffuser包pip install --upgrade diffusers accelerate transformers
#如果要安装最新diffuser可以执行下面指令
pip install githttps://github.com/huggingface/diffusers
! pip install controlnet_hinter0.0.5搭建stablediffusioncontrolnet脚本from diffusers import StableDiffusionControlNetPipeline, ControlNetModel
from diffusers.utils import load_image
import torch
import numpy as np
from PIL import Image
import cv2#加载测试图片
original_image load_image(https://huggingface.co/datasets/diffusers/test-arrays/resolve/main/stable_diffusion_imgvar/input_image_vermeer.png)
original_image#设置controlnetstablefiddufion流水线
controlnet ControlNetModel.from_pretrained(lllyasviel/sd-controlnet-canny,cache_dir./)
pipe StableDiffusionControlNetPipeline.from_pretrained(runwayml/stable-diffusion-v1-5, controlnetcontrolnet, safety_checkerNone
).to(cuda)
pipe.enable_xformers_memory_efficient_attention()#抽取图片canny边界
canny_edged_image load_image(https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_canny_edged.png)
canny_edged_image#canny controlnet做图片生成
generator torch.Generator(devicecpu).manual_seed(3)
image pipe(promptbest quality, extremely detailed, negative_promptmonochrome, lowres, bad anatomy, worst quality, low quality,imagecanny_edged_image,num_inference_steps30, generatorgenerator).images[0]
image#设置canny编辑过滤阀值
control_image np.array(original_image)
low_threshold 10 # default100
high_threshold 150 # default200
control_image cv2.Canny(control_image, low_threshold, high_threshold)
control_image control_image[:, :, None]
control_image np.concatenate([control_image, control_image, control_image], axis2)
control_image Image.fromarray(control_image)
control_image#canny controlnet做图片生成
generator torch.Generator(devicecpu).manual_seed(3)
image pipe(promptbest quality, extremely detailed, negative_promptmonochrome, lowres, bad anatomy, worst quality, low quality,imagecontrol_image,num_inference_steps30, generatorgenerator).images[0]
image#pose的controlnet流水线
controlnet None
pipe Nonecontrolnet ControlNetModel.from_pretrained(lllyasviel/sd-controlnet-openpose,cache_dir ./)
pipe StableDiffusionControlNetPipeline.from_pretrained(runwayml/stable-diffusion-v1-5, controlnetcontrolnet, safety_checkerNone
).to(cuda)
pipe.enable_xformers_memory_efficient_attention()#pose的图片
pose_image load_image(https://huggingface.co/takuma104/controlnet_dev/resolve/main/pose.png)
pose_image#pose流水线生产图
generator torch.Generator(devicecpu).manual_seed(0)
image pipe(promptbest quality, extremely detailed, football, a boy, negative_promptlowres, bad anatomy, worst quality, low quality,imagepose_image, generatorgenerator,num_inference_steps30).images[0]
image#用controlnet_hinter抽取pose控制特征
control_image controlnet_hinter.hint_openpose(original_image)
control_image#pose流水线生产图
generator torch.Generator(devicecpu).manual_seed(0)
image pipe(promptbest quality, extremely detailed, negative_promptlowres, bad anatomy, worst quality, low quality,imagecontrol_image, generatorgenerator,num_inference_steps30).images[0]
image#深度图
controlnet None
pipe Nonecontrolnet ControlNetModel.from_pretrained(lllyasviel/sd-controlnet-depth)
pipe StableDiffusionControlNetPipeline.from_pretrained(runwayml/stable-diffusion-v1-5, controlnetcontrolnet, safety_checkerNone
).to(cuda)
pipe.enable_xformers_memory_efficient_attention()control_image controlnet_hinter.hint_depth(original_image)
control_imagegenerator torch.Generator(devicecpu).manual_seed(0)
image pipe(promptbest quality, extremely detailed, negative_promptmonochrome, lowres, bad anatomy, worst quality, low quality,imagecontrol_image, generatorgenerator,num_inference_steps30).images[0]
image#轮廓图
controlnet None
pipe Nonecontrolnet ControlNetModel.from_pretrained(lllyasviel/sd-controlnet-scribble)
pipe StableDiffusionControlNetPipeline.from_pretrained(runwayml/stable-diffusion-v1-5, controlnetcontrolnet, safety_checkerNone).to(cuda)
pipe.enable_xformers_memory_efficient_attention()scribble_image load_image(https://github.com/lllyasviel/ControlNet/raw/main/test_imgs/user_1.png)
scribble_imagegenerator torch.Generator(devicecpu).manual_seed(1)
image pipe(prompta turtle, best quality, extremely detailed, negative_promptmonochrome, lowres, bad anatomy, worst quality, low quality,imagescribble_image, generatorgenerator,num_inference_steps30).images[0]
image#Segmentation
controlnet None
pipe Nonecontrolnet ControlNetModel.from_pretrained(lllyasviel/sd-controlnet-seg)
pipe StableDiffusionControlNetPipeline.from_pretrained(runwayml/stable-diffusion-v1-5, controlnetcontrolnet, safety_checkerNone).to(cuda)
pipe.enable_xformers_memory_efficient_attention()
control_image controlnet_hinter.hint_segmentation(original_image)
control_imagegenerator torch.Generator(devicecpu).manual_seed(0)
image pipe(promptbest quality, extremely detailed, negative_promptmonochrome, lowres, bad anatomy, worst quality, low quality,imagecontrol_image, generatorgenerator,num_inference_steps30).images[0]
image#Hough 建筑、大场景里用的多
controlnet None
pipe Nonecontrolnet ControlNetModel.from_pretrained(lllyasviel/sd-controlnet-mlsd)
pipe StableDiffusionControlNetPipeline.from_pretrained(runwayml/stable-diffusion-v1-5, controlnetcontrolnet, safety_checkerNone).to(cuda)
pipe.enable_xformers_memory_efficient_attention()control_image controlnet_hinter.hint_hough(original_image)
control_imagegenerator torch.Generator(devicecpu).manual_seed(2)
image pipe(promptbest quality, extremely detailed, negative_promptmonochrome, lowres, bad anatomy, worst quality, low quality,imagecontrol_image, generatorgenerator,num_inference_steps30).images[0]
imagemulti-control net生产流程搭建diffuser包里面现在还没实现多control net控制图生成需要使用multi-controlnet可以用以下代码。# Copyright 2023 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the License);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an AS IS BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.import inspect
from typing import Any, Callable, Dict, List, Optional, Union, Tupleimport numpy as np
import PIL.Image
import torch
from transformers import CLIPFeatureExtractor, CLIPTextModel, CLIPTokenizerfrom diffusers import AutoencoderKL, ControlNetModel, UNet2DConditionModel
from diffusers.schedulers import KarrasDiffusionSchedulers
from diffusers.utils import (PIL_INTERPOLATION,is_accelerate_available,is_accelerate_version,logging,randn_tensor,replace_example_docstring,
)
from diffusers.pipeline_utils import DiffusionPipeline
from diffusers.pipelines.stable_diffusion import StableDiffusionPipelineOutput, StableDiffusionSafetyChecker
from diffusers.models.controlnet import ControlNetOutputlogger logging.get_logger(__name__) # pylint: disableinvalid-nameclass ControlNetProcessor(object):def __init__(self,controlnet: ControlNetModel,image: Union[torch.FloatTensor, PIL.Image.Image, List[torch.FloatTensor], List[PIL.Image.Image]],conditioning_scale: float 1.0,):self.controlnet controlnetself.image imageself.conditioning_scale conditioning_scaledef _default_height_width(self, height, width, image):if isinstance(image, list):image image[0]if height is None:if isinstance(image, PIL.Image.Image):height image.heightelif isinstance(image, torch.Tensor):height image.shape[3]height (height // 8) * 8 # round down to nearest multiple of 8if width is None:if isinstance(image, PIL.Image.Image):width image.widthelif isinstance(image, torch.Tensor):width image.shape[2]width (width // 8) * 8 # round down to nearest multiple of 8return height, widthdef default_height_width(self, height, width):return self._default_height_width(height, width, self.image)def _prepare_image(self, image, width, height, batch_size, num_images_per_prompt, device, dtype):if not isinstance(image, torch.Tensor):if isinstance(image, PIL.Image.Image):image [image]if isinstance(image[0], PIL.Image.Image):image [np.array(i.resize((width, height), resamplePIL_INTERPOLATION[lanczos]))[None, :] for i in image]image np.concatenate(image, axis0)image np.array(image).astype(np.float32) / 255.0image image.transpose(0, 3, 1, 2)image torch.from_numpy(image)elif isinstance(image[0], torch.Tensor):image torch.cat(image, dim0)image_batch_size image.shape[0]if image_batch_size 1:repeat_by batch_sizeelse:# image batch size is the same as prompt batch sizerepeat_by num_images_per_promptimage image.repeat_interleave(repeat_by, dim0)image image.to(devicedevice, dtypedtype)return imagedef _check_inputs(self, image, prompt, prompt_embeds):image_is_pil isinstance(image, PIL.Image.Image)image_is_tensor isinstance(image, torch.Tensor)image_is_pil_list isinstance(image, list) and isinstance(image[0], PIL.Image.Image)image_is_tensor_list isinstance(image, list) and isinstance(image[0], torch.Tensor)if not image_is_pil and not image_is_tensor and not image_is_pil_list and not image_is_tensor_list:raise TypeError(image must be passed and be one of PIL image, torch tensor, list of PIL images, or list of torch tensors)if image_is_pil:image_batch_size 1elif image_is_tensor:image_batch_size image.shape[0]elif image_is_pil_list:image_batch_size len(image)elif image_is_tensor_list:image_batch_size len(image)if prompt is not None and isinstance(prompt, str):prompt_batch_size 1elif prompt is not None and isinstance(prompt, list):prompt_batch_size len(prompt)elif prompt_embeds is not None:prompt_batch_size prompt_embeds.shape[0]if image_batch_size ! 1 and image_batch_size ! prompt_batch_size:raise ValueError(fIf image batch size is not 1, image batch size must be same as prompt batch size. image batch size: {image_batch_size}, prompt batch size: {prompt_batch_size})def check_inputs(self, prompt, prompt_embeds):self._check_inputs(self.image, prompt, prompt_embeds)def prepare_image(self, width, height, batch_size, num_images_per_prompt, device, do_classifier_free_guidance):self.image self._prepare_image(self.image, width, height, batch_size, num_images_per_prompt, device, self.controlnet.dtype)if do_classifier_free_guidance:self.image torch.cat([self.image] * 2)def __call__(self,sample: torch.FloatTensor,timestep: Union[torch.Tensor, float, int],encoder_hidden_states: torch.Tensor,class_labels: Optional[torch.Tensor] None,timestep_cond: Optional[torch.Tensor] None,attention_mask: Optional[torch.Tensor] None,cross_attention_kwargs: Optional[Dict[str, Any]] None,return_dict: bool True,) - Tuple:down_block_res_samples, mid_block_res_sample self.controlnet(samplesample,controlnet_condself.image,timesteptimestep,encoder_hidden_statesencoder_hidden_states,class_labelsclass_labels,timestep_condtimestep_cond,attention_maskattention_mask,cross_attention_kwargscross_attention_kwargs,return_dictFalse,)down_block_res_samples [down_block_res_sample * self.conditioning_scale for down_block_res_sample in down_block_res_samples]mid_block_res_sample * self.conditioning_scalereturn (down_block_res_samples, mid_block_res_sample)EXAMPLE_DOC_STRING Examples:py # !pip install opencv-python transformers accelerate from diffusers import StableDiffusionControlNetPipeline, ControlNetModel, UniPCMultistepScheduler from diffusers.utils import load_image import numpy as np import torch import cv2 from PIL import Image # download an image image load_image(... https://hf.co/datasets/huggingface/documentation-images/resolve/main/diffusers/input_image_vermeer.png... ) image np.array(image) # get canny image image cv2.Canny(image, 100, 200) image image[:, :, None] image np.concatenate([image, image, image], axis2) canny_image Image.fromarray(image) # load control net and stable diffusion v1-5 controlnet ControlNetModel.from_pretrained(lllyasviel/sd-controlnet-canny, torch_dtypetorch.float16) pipe StableDiffusionControlNetPipeline.from_pretrained(... runwayml/stable-diffusion-v1-5, controlnetcontrolnet, torch_dtypetorch.float16... ) # speed up diffusion process with faster scheduler and memory optimization pipe.scheduler UniPCMultistepScheduler.from_config(pipe.scheduler.config) # remove following line if xformers is not installed pipe.enable_xformers_memory_efficient_attention() pipe.enable_model_cpu_offload() # generate image generator torch.manual_seed(0) image pipe(... futuristic-looking woman, num_inference_steps20, generatorgenerator, imagecanny_image... ).images[0]
class StableDiffusionMultiControlNetPipeline(DiffusionPipeline):rPipeline for text-to-image generation using Stable Diffusion with ControlNet guidance.This model inherits from [DiffusionPipeline]. Check the superclass documentation for the generic methods thelibrary implements for all the pipelines (such as downloading or saving, running on a particular device, etc.)Args:vae ([AutoencoderKL]):Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.text_encoder ([CLIPTextModel]):Frozen text-encoder. Stable Diffusion uses the text portion of[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specificallythe [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant.tokenizer (CLIPTokenizer):Tokenizer of class[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).unet ([UNet2DConditionModel]): Conditional U-Net architecture to denoise the encoded image latents.scheduler ([SchedulerMixin]):A scheduler to be used in combination with unet to denoise the encoded image latents. Can be one of[DDIMScheduler], [LMSDiscreteScheduler], or [PNDMScheduler].safety_checker ([StableDiffusionSafetyChecker]):Classification module that estimates whether generated images could be considered offensive or harmful.Please, refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for details.feature_extractor ([CLIPFeatureExtractor]):Model that extracts features from generated images to be used as inputs for the safety_checker._optional_components [safety_checker, feature_extractor]def __init__(self,vae: AutoencoderKL,text_encoder: CLIPTextModel,tokenizer: CLIPTokenizer,unet: UNet2DConditionModel,scheduler: KarrasDiffusionSchedulers,safety_checker: StableDiffusionSafetyChecker,feature_extractor: CLIPFeatureExtractor,requires_safety_checker: bool True,):super().__init__()if safety_checker is None and requires_safety_checker:logger.warning(fYou have disabled the safety checker for {self.__class__} by passing safety_checkerNone. Ensure that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered results in services or applications open to the public. Both the diffusers team and Hugging Face strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling it only for use-cases that involve analyzing network behavior or auditing its results. For more information, please have a look at https://github.com/huggingface/diffusers/pull/254 .)if safety_checker is not None and feature_extractor is None:raise ValueError(Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety checker. If you do not want to use the safety checker, you can pass safety_checkerNone instead.)self.register_modules(vaevae,text_encodertext_encoder,tokenizertokenizer,unetunet,schedulerscheduler,safety_checkersafety_checker,feature_extractorfeature_extractor,)self.vae_scale_factor 2 ** (len(self.vae.config.block_out_channels) - 1)self.register_to_config(requires_safety_checkerrequires_safety_checker)# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.enable_vae_slicingdef enable_vae_slicing(self):rEnable sliced VAE decoding.When this option is enabled, the VAE will split the input tensor in slices to compute decoding in severalsteps. This is useful to save some memory and allow larger batch sizes.self.vae.enable_slicing()# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.disable_vae_slicingdef disable_vae_slicing(self):rDisable sliced VAE decoding. If enable_vae_slicing was previously invoked, this method will go back tocomputing decoding in one step.self.vae.disable_slicing()def enable_sequential_cpu_offload(self, gpu_id0):rOffloads all models to CPU using accelerate, significantly reducing memory usage. When called, unet,text_encoder, vae, controlnet, and safety checker have their state dicts saved to CPU and then are moved to atorch.device(meta) and loaded to GPU only when their specific submodule has its forward method called.Note that offloading happens on a submodule basis. Memory savings are higher than withenable_model_cpu_offload, but performance is lower.if is_accelerate_available():from accelerate import cpu_offloadelse:raise ImportError(Please install accelerate via pip install accelerate)device torch.device(fcuda:{gpu_id})for cpu_offloaded_model in [self.unet, self.text_encoder, self.vae]:cpu_offload(cpu_offloaded_model, device)if self.safety_checker is not None:cpu_offload(self.safety_checker, execution_devicedevice, offload_buffersTrue)def enable_model_cpu_offload(self, gpu_id0):rOffloads all models to CPU using accelerate, reducing memory usage with a low impact on performance. Comparedto enable_sequential_cpu_offload, this method moves one whole model at a time to the GPU when its forwardmethod is called, and the model remains in GPU until the next model runs. Memory savings are lower than withenable_sequential_cpu_offload, but performance is much better due to the iterative execution of the unet.if is_accelerate_available() and is_accelerate_version(, 0.17.0.dev0):from accelerate import cpu_offload_with_hookelse:raise ImportError(enable_model_offload requires accelerate v0.17.0 or higher.)device torch.device(fcuda:{gpu_id})hook Nonefor cpu_offloaded_model in [self.text_encoder, self.unet, self.vae]:_, hook cpu_offload_with_hook(cpu_offloaded_model, device, prev_module_hookhook)if self.safety_checker is not None:# the safety checker can offload the vae again_, hook cpu_offload_with_hook(self.safety_checker, device, prev_module_hookhook)# control net hook has be manually offloaded as it alternates with unet# cpu_offload_with_hook(self.controlnet, device)# Well offload the last model manually.self.final_offload_hook hookproperty# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._execution_devicedef _execution_device(self):rReturns the device on which the pipelines models will be executed. After callingpipeline.enable_sequential_cpu_offload() the execution device can only be inferred from Accelerates modulehooks.if not hasattr(self.unet, _hf_hook):return self.devicefor module in self.unet.modules():if (hasattr(module, _hf_hook)and hasattr(module._hf_hook, execution_device)and module._hf_hook.execution_device is not None):return torch.device(module._hf_hook.execution_device)return self.device# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_promptdef _encode_prompt(self,prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_promptNone,prompt_embeds: Optional[torch.FloatTensor] None,negative_prompt_embeds: Optional[torch.FloatTensor] None,):rEncodes the prompt into text encoder hidden states.Args:prompt (str or List[str], *optional*):prompt to be encodeddevice: (torch.device):torch devicenum_images_per_prompt (int):number of images that should be generated per promptdo_classifier_free_guidance (bool):whether to use classifier free guidance or notnegative_prompt (str or List[str], *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to passnegative_prompt_embeds. instead. If not defined, one has to pass negative_prompt_embeds. instead.Ignored when not using guidance (i.e., ignored if guidance_scale is less than 1).prompt_embeds (torch.FloatTensor, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from prompt input argument.negative_prompt_embeds (torch.FloatTensor, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from negative_prompt inputargument.if prompt is not None and isinstance(prompt, str):batch_size 1elif prompt is not None and isinstance(prompt, list):batch_size len(prompt)else:batch_size prompt_embeds.shape[0]if prompt_embeds is None:text_inputs self.tokenizer(prompt,paddingmax_length,max_lengthself.tokenizer.model_max_length,truncationTrue,return_tensorspt,)text_input_ids text_inputs.input_idsuntruncated_ids self.tokenizer(prompt, paddinglongest, return_tensorspt).input_idsif untruncated_ids.shape[-1] text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):removed_text self.tokenizer.batch_decode(untruncated_ids[:, self.tokenizer.model_max_length - 1 : -1])logger.warning(The following part of your input was truncated because CLIP can only handle sequences up tof {self.tokenizer.model_max_length} tokens: {removed_text})if hasattr(self.text_encoder.config, use_attention_mask) and self.text_encoder.config.use_attention_mask:attention_mask text_inputs.attention_mask.to(device)else:attention_mask Noneprompt_embeds self.text_encoder(text_input_ids.to(device),attention_maskattention_mask,)prompt_embeds prompt_embeds[0]prompt_embeds prompt_embeds.to(dtypeself.text_encoder.dtype, devicedevice)bs_embed, seq_len, _ prompt_embeds.shape# duplicate text embeddings for each generation per prompt, using mps friendly methodprompt_embeds prompt_embeds.repeat(1, num_images_per_prompt, 1)prompt_embeds prompt_embeds.view(bs_embed * num_images_per_prompt, seq_len, -1)# get unconditional embeddings for classifier free guidanceif do_classifier_free_guidance and negative_prompt_embeds is None:uncond_tokens: List[str]if negative_prompt is None:uncond_tokens [] * batch_sizeelif type(prompt) is not type(negative_prompt):raise TypeError(fnegative_prompt should be the same type to prompt, but got {type(negative_prompt)} !f {type(prompt)}.)elif isinstance(negative_prompt, str):uncond_tokens [negative_prompt]elif batch_size ! len(negative_prompt):raise ValueError(fnegative_prompt: {negative_prompt} has batch size {len(negative_prompt)}, but prompt:f {prompt} has batch size {batch_size}. Please make sure that passed negative_prompt matches the batch size of prompt.)else:uncond_tokens negative_promptmax_length prompt_embeds.shape[1]uncond_input self.tokenizer(uncond_tokens,paddingmax_length,max_lengthmax_length,truncationTrue,return_tensorspt,)if hasattr(self.text_encoder.config, use_attention_mask) and self.text_encoder.config.use_attention_mask:attention_mask uncond_input.attention_mask.to(device)else:attention_mask Nonenegative_prompt_embeds self.text_encoder(uncond_input.input_ids.to(device),attention_maskattention_mask,)negative_prompt_embeds negative_prompt_embeds[0]if do_classifier_free_guidance:# duplicate unconditional embeddings for each generation per prompt, using mps friendly methodseq_len negative_prompt_embeds.shape[1]negative_prompt_embeds negative_prompt_embeds.to(dtypeself.text_encoder.dtype, devicedevice)negative_prompt_embeds negative_prompt_embeds.repeat(1, num_images_per_prompt, 1)negative_prompt_embeds negative_prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)# For classifier free guidance, we need to do two forward passes.# Here we concatenate the unconditional and text embeddings into a single batch# to avoid doing two forward passesprompt_embeds torch.cat([negative_prompt_embeds, prompt_embeds])return prompt_embeds# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checkerdef run_safety_checker(self, image, device, dtype):if self.safety_checker is not None:safety_checker_input self.feature_extractor(self.numpy_to_pil(image), return_tensorspt).to(device)image, has_nsfw_concept self.safety_checker(imagesimage, clip_inputsafety_checker_input.pixel_values.to(dtype))else:has_nsfw_concept Nonereturn image, has_nsfw_concept# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latentsdef decode_latents(self, latents):latents 1 / self.vae.config.scaling_factor * latentsimage self.vae.decode(latents).sampleimage (image / 2 0.5).clamp(0, 1)# we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16image image.cpu().permute(0, 2, 3, 1).float().numpy()return image# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargsdef prepare_extra_step_kwargs(self, generator, eta):# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502# and should be between [0, 1]accepts_eta eta in set(inspect.signature(self.scheduler.step).parameters.keys())extra_step_kwargs {}if accepts_eta:extra_step_kwargs[eta] eta# check if the scheduler accepts generatoraccepts_generator generator in set(inspect.signature(self.scheduler.step).parameters.keys())if accepts_generator:extra_step_kwargs[generator] generatorreturn extra_step_kwargsdef check_inputs(self,prompt,height,width,callback_steps,negative_promptNone,prompt_embedsNone,negative_prompt_embedsNone,):if height % 8 ! 0 or width % 8 ! 0:raise ValueError(fheight and width have to be divisible by 8 but are {height} and {width}.)if (callback_steps is None) or (callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps 0)):raise ValueError(fcallback_steps has to be a positive integer but is {callback_steps} of typef {type(callback_steps)}.)if prompt is not None and prompt_embeds is not None:raise ValueError(fCannot forward both prompt: {prompt} and prompt_embeds: {prompt_embeds}. Please make sure to only forward one of the two.)elif prompt is None and prompt_embeds is None:raise ValueError(Provide either prompt or prompt_embeds. Cannot leave both prompt and prompt_embeds undefined.)elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):raise ValueError(fprompt has to be of type str or list but is {type(prompt)})if negative_prompt is not None and negative_prompt_embeds is not None:raise ValueError(fCannot forward both negative_prompt: {negative_prompt} and negative_prompt_embeds:f {negative_prompt_embeds}. Please make sure to only forward one of the two.)if prompt_embeds is not None and negative_prompt_embeds is not None:if prompt_embeds.shape ! negative_prompt_embeds.shape:raise ValueError(prompt_embeds and negative_prompt_embeds must have the same shape when passed directly, butf got: prompt_embeds {prompt_embeds.shape} ! negative_prompt_embedsf {negative_prompt_embeds.shape}.)# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latentsdef prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latentsNone):shape (batch_size, num_channels_latents, height // self.vae_scale_factor, width // self.vae_scale_factor)if isinstance(generator, list) and len(generator) ! batch_size:raise ValueError(fYou have passed a list of generators of length {len(generator)}, but requested an effective batchf size of {batch_size}. Make sure the batch size matches the length of the generators.)if latents is None:latents randn_tensor(shape, generatorgenerator, devicedevice, dtypedtype)else:latents latents.to(device)# scale the initial noise by the standard deviation required by the schedulerlatents latents * self.scheduler.init_noise_sigmareturn latentstorch.no_grad()replace_example_docstring(EXAMPLE_DOC_STRING)def __call__(self,processors: List[ControlNetProcessor],prompt: Union[str, List[str]] None,height: Optional[int] None,width: Optional[int] None,num_inference_steps: int 50,guidance_scale: float 7.5,negative_prompt: Optional[Union[str, List[str]]] None,num_images_per_prompt: Optional[int] 1,eta: float 0.0,generator: Optional[Union[torch.Generator, List[torch.Generator]]] None,latents: Optional[torch.FloatTensor] None,prompt_embeds: Optional[torch.FloatTensor] None,negative_prompt_embeds: Optional[torch.FloatTensor] None,output_type: Optional[str] pil,return_dict: bool True,callback: Optional[Callable[[int, int, torch.FloatTensor], None]] None,callback_steps: int 1,cross_attention_kwargs: Optional[Dict[str, Any]] None,):rFunction invoked when calling the pipeline for generation.Args:prompt (str or List[str], *optional*):The prompt or prompts to guide the image generation. If not defined, one has to pass prompt_embeds.instead.height (int, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The height in pixels of the generated image.width (int, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The width in pixels of the generated image.num_inference_steps (int, *optional*, defaults to 50):The number of denoising steps. More denoising steps usually lead to a higher quality image at theexpense of slower inference.guidance_scale (float, *optional*, defaults to 7.5):Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598).guidance_scale is defined as w of equation 2. of [ImagenPaper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting guidance_scale 1. Higher guidance scale encourages to generate images that are closely linked to the text prompt,usually at the expense of lower image quality.negative_prompt (str or List[str], *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to passnegative_prompt_embeds. instead. If not defined, one has to pass negative_prompt_embeds. instead.Ignored when not using guidance (i.e., ignored if guidance_scale is less than 1).num_images_per_prompt (int, *optional*, defaults to 1):The number of images to generate per prompt.eta (float, *optional*, defaults to 0.0):Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to[schedulers.DDIMScheduler], will be ignored for others.generator (torch.Generator or List[torch.Generator], *optional*):One or a list of [torch generator(s)](https://pytorch.org/docs/stable/generated/torch.Generator.html)to make generation deterministic.latents (torch.FloatTensor, *optional*):Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for imagegeneration. Can be used to tweak the same generation with different prompts. If not provided, a latentstensor will ge generated by sampling using the supplied random generator.prompt_embeds (torch.FloatTensor, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from prompt input argument.negative_prompt_embeds (torch.FloatTensor, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from negative_prompt inputargument.output_type (str, *optional*, defaults to pil):The output format of the generate image. Choose between[PIL](https://pillow.readthedocs.io/en/stable/): PIL.Image.Image or np.array.return_dict (bool, *optional*, defaults to True):Whether or not to return a [~pipelines.stable_diffusion.StableDiffusionPipelineOutput] instead of aplain tuple.callback (Callable, *optional*):A function that will be called every callback_steps steps during inference. The function will becalled with the following arguments: callback(step: int, timestep: int, latents: torch.FloatTensor).callback_steps (int, *optional*, defaults to 1):The frequency at which the callback function will be called. If not specified, the callback will becalled at every step.cross_attention_kwargs (dict, *optional*):A kwargs dictionary that if specified is passed along to the AttnProcessor as defined underself.processor in[diffusers.cross_attention](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/cross_attention.py).Examples:Returns:[~pipelines.stable_diffusion.StableDiffusionPipelineOutput] or tuple:[~pipelines.stable_diffusion.StableDiffusionPipelineOutput] if return_dict is True, otherwise a tuple.When returning a tuple, the first element is a list with the generated images, and the second element is alist of bools denoting whether the corresponding generated image likely represents not-safe-for-work(nsfw) content, according to the safety_checker.# 0. Default height and width to unetheight, width processors[0].default_height_width(height, width)# 1. Check inputs. Raise error if not correctself.check_inputs(prompt, height, width, callback_steps, negative_prompt, prompt_embeds, negative_prompt_embeds)for processor in processors:processor.check_inputs(prompt, prompt_embeds)# 2. Define call parametersif prompt is not None and isinstance(prompt, str):batch_size 1elif prompt is not None and isinstance(prompt, list):batch_size len(prompt)else:batch_size prompt_embeds.shape[0]device self._execution_device# here guidance_scale is defined analog to the guidance weight w of equation (2)# of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . guidance_scale 1# corresponds to doing no classifier free guidance.do_classifier_free_guidance guidance_scale 1.0# 3. Encode input promptprompt_embeds self._encode_prompt(prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt,prompt_embedsprompt_embeds,negative_prompt_embedsnegative_prompt_embeds,)# 4. Prepare imagefor processor in processors:processor.prepare_image(widthwidth,heightheight,batch_sizebatch_size * num_images_per_prompt,num_images_per_promptnum_images_per_prompt,devicedevice,do_classifier_free_guidancedo_classifier_free_guidance,)# 5. Prepare timestepsself.scheduler.set_timesteps(num_inference_steps, devicedevice)timesteps self.scheduler.timesteps# 6. Prepare latent variablesnum_channels_latents self.unet.in_channelslatents self.prepare_latents(batch_size * num_images_per_prompt,num_channels_latents,height,width,prompt_embeds.dtype,device,generator,latents,)# 7. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipelineextra_step_kwargs self.prepare_extra_step_kwargs(generator, eta)# 8. Denoising loopnum_warmup_steps len(timesteps) - num_inference_steps * self.scheduler.orderwith self.progress_bar(totalnum_inference_steps) as progress_bar:for i, t in enumerate(timesteps):# expand the latents if we are doing classifier free guidancelatent_model_input torch.cat([latents] * 2) if do_classifier_free_guidance else latentslatent_model_input self.scheduler.scale_model_input(latent_model_input, t)# controlnet inferencefor i, processor in enumerate(processors):down_samples, mid_sample processor(latent_model_input,t,encoder_hidden_statesprompt_embeds,return_dictFalse,)if i 0:down_block_res_samples, mid_block_res_sample down_samples, mid_sampleelse:down_block_res_samples [d_prev d_curr for d_prev, d_curr in zip(down_block_res_samples, down_samples)]mid_block_res_sample mid_block_res_sample mid_sample# predict the noise residualnoise_pred self.unet(latent_model_input,t,encoder_hidden_statesprompt_embeds,cross_attention_kwargscross_attention_kwargs,down_block_additional_residualsdown_block_res_samples,mid_block_additional_residualmid_block_res_sample,).sample# perform guidanceif do_classifier_free_guidance:noise_pred_uncond, noise_pred_text noise_pred.chunk(2)noise_pred noise_pred_uncond guidance_scale * (noise_pred_text - noise_pred_uncond)# compute the previous noisy sample x_t - x_t-1latents self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample# call the callback, if providedif i len(timesteps) - 1 or ((i 1) num_warmup_steps and (i 1) % self.scheduler.order 0):progress_bar.update()if callback is not None and i % callback_steps 0:callback(i, t, latents)# If we do sequential model offloading, lets offload unet and controlnet# manually for max memory savingsif hasattr(self, final_offload_hook) and self.final_offload_hook is not None:self.unet.to(cpu)torch.cuda.empty_cache()if output_type latent:image latentshas_nsfw_concept Noneelif output_type pil:# 8. Post-processingimage self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept self.run_safety_checker(image, device, prompt_embeds.dtype)# 10. Convert to PILimage self.numpy_to_pil(image)else:# 8. Post-processingimage self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept self.run_safety_checker(image, device, prompt_embeds.dtype)# Offload last model to CPUif hasattr(self, final_offload_hook) and self.final_offload_hook is not None:self.final_offload_hook.offload()if not return_dict:return (image, has_nsfw_concept)return StableDiffusionPipelineOutput(imagesimage, nsfw_content_detectedhas_nsfw_concept)# demo simple test
def main():from diffusers.utils import load_imagepipe StableDiffusionMultiControlNetPipeline.from_pretrained(./model, safety_checkerNone, torch_dtypetorch.float16).to(cuda)pipe.enable_xformers_memory_efficient_attention()controlnet_canny ControlNetModel.from_pretrained(lllyasviel/sd-controlnet-canny,cache_dir./controlmodel, torch_dtypetorch.float16).to(cuda)controlnet_pose ControlNetModel.from_pretrained(lllyasviel/sd-controlnet-openpose,cache_dir./controlmodel, torch_dtypetorch.float16).to(cuda)canny_left load_image(https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_left.png)canny_right load_image(https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_right.png)pose_right load_image(https://huggingface.co/takuma104/controlnet_dev/resolve/main/pose_right.png)image pipe(promptbest quality, extremely detailed,negative_promptmonochrome, lowres, bad anatomy, worst quality, low quality,processors[ControlNetProcessor(controlnet_canny, canny_left),ControlNetProcessor(controlnet_canny, canny_right),],generatortorch.Generator(devicecpu).manual_seed(0),num_inference_steps30,width512,height512,).images[0]image.save(./canny_left_right.png)image pipe(promptbest quality, extremely detailed,negative_promptmonochrome, lowres, bad anatomy, worst quality, low quality,processors[ControlNetProcessor(controlnet_canny, canny_left,0.5),ControlNetProcessor(controlnet_pose, pose_right,1.6),],generatortorch.Generator(devicecpu).manual_seed(0),num_inference_steps30,width512,height512,).images[0]image.save(./canny_left_pose_right.png)if __name__ __main__:main()diffuser没有的功能如何自己实现加入假设我们要自己实现一个stablediffusioncontrolnetinpaint的功能该如何实现。这个任务大部分是在生产流程上做串接所以代码基本可以定位在pipline模块stablediffusion。假设我们代码实现如下代码在特定版本有效这个后面会升级大家可以不急着用。只是跟大家讲解diffuser的代码框架以及改动一个模块该如何融入diffuser中供自己使用#代码文件名pipeline_stable_diffusion_controlnet_inpaint.py
# Copyright 2023 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the License);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an AS IS BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.import inspect
from typing import Any, Callable, Dict, List, Optional, Unionimport numpy as np
import PIL.Image
import torch
from transformers import CLIPFeatureExtractor, CLIPTextModel, CLIPTokenizerfrom ...models import AutoencoderKL, UNet2DConditionModel
from ...schedulers import KarrasDiffusionSchedulers
from ...utils import is_accelerate_available, logging, randn_tensor, replace_example_docstring
from ..pipeline_utils import DiffusionPipeline
from . import StableDiffusionPipelineOutput
from .safety_checker import StableDiffusionSafetyCheckerlogger logging.get_logger(__name__) # pylint: disableinvalid-nameEXAMPLE_DOC_STRING Examples:py from diffusers import StableDiffusionControlNetPipeline from diffusers.utils import load_image # Canny edged image for control canny_edged_image load_image(... https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_canny_edged.png... ) pipe StableDiffusionControlNetPipeline.from_pretrained(takuma104/control_sd15_canny).to(cuda) image pipe(promptbest quality, extremely detailed, controlnet_hintcanny_edged_image).images[0]
def prepare_mask_and_masked_image(image, mask):Prepares a pair (image, mask) to be consumed by the Stable Diffusion pipeline. This means that those inputs will beconverted to torch.Tensor with shapes batch x channels x height x width where channels is 3 for theimage and 1 for the mask.The image will be converted to torch.float32 and normalized to be in [-1, 1]. The mask will bebinarized (mask 0.5) and cast to torch.float32 too.Args:image (Union[np.array, PIL.Image, torch.Tensor]): The image to inpaint.It can be a PIL.Image, or a height x width x 3 np.array or a channels x height x widthtorch.Tensor or a batch x channels x height x width torch.Tensor.mask (_type_): The mask to apply to the image, i.e. regions to inpaint.It can be a PIL.Image, or a height x width np.array or a 1 x height x widthtorch.Tensor or a batch x 1 x height x width torch.Tensor.Raises:ValueError: torch.Tensor images should be in the [-1, 1] range. ValueError: torch.Tensor maskshould be in the [0, 1] range. ValueError: mask and image should have the same spatial dimensions.TypeError: mask is a torch.Tensor but image is not(ot the other way around).Returns:tuple[torch.Tensor]: The pair (mask, masked_image) as torch.Tensor with 4dimensions: batch x channels x height x width.if isinstance(image, torch.Tensor):if not isinstance(mask, torch.Tensor):raise TypeError(fimage is a torch.Tensor but mask (type: {type(mask)} is not)# Batch single imageif image.ndim 3:assert image.shape[0] 3, Image outside a batch should be of shape (3, H, W)image image.unsqueeze(0)# Batch and add channel dim for single maskif mask.ndim 2:mask mask.unsqueeze(0).unsqueeze(0)# Batch single mask or add channel dimif mask.ndim 3:# Single batched mask, no channel dim or single mask not batched but channel dimif mask.shape[0] 1:mask mask.unsqueeze(0)# Batched masks no channel dimelse:mask mask.unsqueeze(1)assert image.ndim 4 and mask.ndim 4, Image and Mask must have 4 dimensionsassert image.shape[-2:] mask.shape[-2:], Image and Mask must have the same spatial dimensionsassert image.shape[0] mask.shape[0], Image and Mask must have the same batch size# Check image is in [-1, 1]if image.min() -1 or image.max() 1:raise ValueError(Image should be in [-1, 1] range)# Check mask is in [0, 1]if mask.min() 0 or mask.max() 1:raise ValueError(Mask should be in [0, 1] range)# Binarize maskmask[mask 0.5] 0mask[mask 0.5] 1# Image as float32image image.to(dtypetorch.float32)elif isinstance(mask, torch.Tensor):raise TypeError(fmask is a torch.Tensor but image (type: {type(image)} is not)else:# preprocess imageif isinstance(image, (PIL.Image.Image, np.ndarray)):image [image]if isinstance(image, list) and isinstance(image[0], PIL.Image.Image):image [np.array(i.convert(RGB))[None, :] for i in image]image np.concatenate(image, axis0)elif isinstance(image, list) and isinstance(image[0], np.ndarray):image np.concatenate([i[None, :] for i in image], axis0)image image.transpose(0, 3, 1, 2)image torch.from_numpy(image).to(dtypetorch.float32) / 127.5 - 1.0# preprocess maskif isinstance(mask, (PIL.Image.Image, np.ndarray)):mask [mask]if isinstance(mask, list) and isinstance(mask[0], PIL.Image.Image):mask np.concatenate([np.array(m.convert(L))[None, None, :] for m in mask], axis0)mask mask.astype(np.float32) / 255.0elif isinstance(mask, list) and isinstance(mask[0], np.ndarray):mask np.concatenate([m[None, None, :] for m in mask], axis0)mask[mask 0.5] 0mask[mask 0.5] 1mask torch.from_numpy(mask)masked_image image * (mask 0.5)return mask, masked_imageclass StableDiffusionControlNetInpaintPipeline(DiffusionPipeline):rPipeline for text-to-image generation using Stable Diffusion with ControlNet guidance.This model inherits from [DiffusionPipeline]. Check the superclass documentation for the generic methods thelibrary implements for all the pipelines (such as downloading or saving, running on a particular device, etc.)Args:vae ([AutoencoderKL]):Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.text_encoder ([CLIPTextModel]):Frozen text-encoder. Stable Diffusion uses the text portion of[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specificallythe [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant.tokenizer (CLIPTokenizer):Tokenizer of class[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).unet ([UNet2DConditionModel]): Conditional U-Net architecture to denoise the encoded image latents.controlnet ([UNet2DConditionModel]):[ControlNet](https://arxiv.org/abs/2302.05543) architecture to generate guidance.scheduler ([SchedulerMixin]):A scheduler to be used in combination with unet to denoise the encoded image latents. Can be one of[DDIMScheduler], [LMSDiscreteScheduler], or [PNDMScheduler].safety_checker ([StableDiffusionSafetyChecker]):Classification module that estimates whether generated images could be considered offensive or harmful.Please, refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for details.feature_extractor ([CLIPFeatureExtractor]):Model that extracts features from generated images to be used as inputs for the safety_checker.def __init__(self,vae: AutoencoderKL,text_encoder: CLIPTextModel,tokenizer: CLIPTokenizer,unet: UNet2DConditionModel,controlnet: UNet2DConditionModel,scheduler: KarrasDiffusionSchedulers,safety_checker: StableDiffusionSafetyChecker,feature_extractor: CLIPFeatureExtractor,requires_safety_checker: bool True,):super().__init__()self.register_modules(vaevae,text_encodertext_encoder,tokenizertokenizer,unetunet,controlnetcontrolnet,schedulerscheduler,safety_checkersafety_checker,feature_extractorfeature_extractor,)self.vae_scale_factor 2 ** (len(self.vae.config.block_out_channels) - 1)self.register_to_config(requires_safety_checkerrequires_safety_checker)def enable_vae_slicing(self):rEnable sliced VAE decoding.When this option is enabled, the VAE will split the input tensor in slices to compute decoding in severalsteps. This is useful to save some memory and allow larger batch sizes.self.vae.enable_slicing()def disable_vae_slicing(self):rDisable sliced VAE decoding. If enable_vae_slicing was previously invoked, this method will go back tocomputing decoding in one step.self.vae.disable_slicing()def enable_sequential_cpu_offload(self, gpu_id0):rOffloads all models to CPU using accelerate, significantly reducing memory usage. When called, unet,text_encoder, vae and safety checker have their state dicts saved to CPU and then are moved to atorch.device(meta) and loaded to GPU only when their specific submodule has its forward method called.if is_accelerate_available():from accelerate import cpu_offloadelse:raise ImportError(Please install accelerate via pip install accelerate)device torch.device(fcuda:{gpu_id})for cpu_offloaded_model in [self.unet, self.text_encoder, self.vae]:cpu_offload(cpu_offloaded_model, device)if self.safety_checker is not None:cpu_offload(self.safety_checker, execution_devicedevice, offload_buffersTrue)propertydef _execution_device(self):rReturns the device on which the pipelines models will be executed. After callingpipeline.enable_sequential_cpu_offload() the execution device can only be inferred from Accelerates modulehooks.if self.device ! torch.device(meta) or not hasattr(self.unet, _hf_hook):return self.devicefor module in self.unet.modules():if (hasattr(module, _hf_hook)and hasattr(module._hf_hook, execution_device)and module._hf_hook.execution_device is not None):return torch.device(module._hf_hook.execution_device)return self.devicedef _encode_prompt(self,prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_promptNone,prompt_embeds: Optional[torch.FloatTensor] None,negative_prompt_embeds: Optional[torch.FloatTensor] None,):rEncodes the prompt into text encoder hidden states.Args:prompt (str or List[str], *optional*):prompt to be encodeddevice: (torch.device):torch devicenum_images_per_prompt (int):number of images that should be generated per promptdo_classifier_free_guidance (bool):whether to use classifier free guidance or notnegative_prompt (str or List[str], *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to passnegative_prompt_embeds. instead. If not defined, one has to pass negative_prompt_embeds. instead.Ignored when not using guidance (i.e., ignored if guidance_scale is less than 1).prompt_embeds (torch.FloatTensor, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from prompt input argument.negative_prompt_embeds (torch.FloatTensor, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from negative_prompt inputargument.if prompt is not None and isinstance(prompt, str):batch_size 1elif prompt is not None and isinstance(prompt, list):batch_size len(prompt)else:batch_size prompt_embeds.shape[0]if prompt_embeds is None:text_inputs self.tokenizer(prompt,paddingmax_length,max_lengthself.tokenizer.model_max_length,truncationTrue,return_tensorspt,)text_input_ids text_inputs.input_idsuntruncated_ids self.tokenizer(prompt, paddinglongest, return_tensorspt).input_idsif untruncated_ids.shape[-1] text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):removed_text self.tokenizer.batch_decode(untruncated_ids[:, self.tokenizer.model_max_length - 1 : -1])logger.warning(The following part of your input was truncated because CLIP can only handle sequences up tof {self.tokenizer.model_max_length} tokens: {removed_text})if hasattr(self.text_encoder.config, use_attention_mask) and self.text_encoder.config.use_attention_mask:attention_mask text_inputs.attention_mask.to(device)else:attention_mask Noneprompt_embeds self.text_encoder(text_input_ids.to(device),attention_maskattention_mask,)prompt_embeds prompt_embeds[0]prompt_embeds prompt_embeds.to(dtypeself.text_encoder.dtype, devicedevice)bs_embed, seq_len, _ prompt_embeds.shape# duplicate text embeddings for each generation per prompt, using mps friendly methodprompt_embeds prompt_embeds.repeat(1, num_images_per_prompt, 1)prompt_embeds prompt_embeds.view(bs_embed * num_images_per_prompt, seq_len, -1)# get unconditional embeddings for classifier free guidanceif do_classifier_free_guidance and negative_prompt_embeds is None:uncond_tokens: List[str]if negative_prompt is None:uncond_tokens [] * batch_sizeelif type(prompt) is not type(negative_prompt):raise TypeError(fnegative_prompt should be the same type to prompt, but got {type(negative_prompt)} !f {type(prompt)}.)elif isinstance(negative_prompt, str):uncond_tokens [negative_prompt]elif batch_size ! len(negative_prompt):raise ValueError(fnegative_prompt: {negative_prompt} has batch size {len(negative_prompt)}, but prompt:f {prompt} has batch size {batch_size}. Please make sure that passed negative_prompt matches the batch size of prompt.)else:uncond_tokens negative_promptmax_length prompt_embeds.shape[1]uncond_input self.tokenizer(uncond_tokens,paddingmax_length,max_lengthmax_length,truncationTrue,return_tensorspt,)if hasattr(self.text_encoder.config, use_attention_mask) and self.text_encoder.config.use_attention_mask:attention_mask uncond_input.attention_mask.to(device)else:attention_mask Nonenegative_prompt_embeds self.text_encoder(uncond_input.input_ids.to(device),attention_maskattention_mask,)negative_prompt_embeds negative_prompt_embeds[0]if do_classifier_free_guidance:# duplicate unconditional embeddings for each generation per prompt, using mps friendly methodseq_len negative_prompt_embeds.shape[1]negative_prompt_embeds negative_prompt_embeds.to(dtypeself.text_encoder.dtype, devicedevice)negative_prompt_embeds negative_prompt_embeds.repeat(1, num_images_per_prompt, 1)negative_prompt_embeds negative_prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)# For classifier free guidance, we need to do two forward passes.# Here we concatenate the unconditional and text embeddings into a single batch# to avoid doing two forward passesprompt_embeds torch.cat([negative_prompt_embeds, prompt_embeds])return prompt_embedsdef run_safety_checker(self, image, device, dtype):if self.safety_checker is not None:safety_checker_input self.feature_extractor(self.numpy_to_pil(image), return_tensorspt).to(device)image, has_nsfw_concept self.safety_checker(imagesimage, clip_inputsafety_checker_input.pixel_values.to(dtype))else:has_nsfw_concept Nonereturn image, has_nsfw_conceptdef decode_latents(self, latents):latents 1 / self.vae.config.scaling_factor * latentsimage self.vae.decode(latents).sampleimage (image / 2 0.5).clamp(0, 1)# we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16image image.cpu().permute(0, 2, 3, 1).float().numpy()return imagedef prepare_extra_step_kwargs(self, generator, eta):# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502# and should be between [0, 1]accepts_eta eta in set(inspect.signature(self.scheduler.step).parameters.keys())extra_step_kwargs {}if accepts_eta:extra_step_kwargs[eta] eta# check if the scheduler accepts generatoraccepts_generator generator in set(inspect.signature(self.scheduler.step).parameters.keys())if accepts_generator:extra_step_kwargs[generator] generatorreturn extra_step_kwargsdef decode_latents(self, latents):latents 1 / self.vae.config.scaling_factor * latentsimage self.vae.decode(latents).sampleimage (image / 2 0.5).clamp(0, 1)# we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16image image.cpu().permute(0, 2, 3, 1).float().numpy()return imagedef check_inputs(self,prompt,height,width,callback_steps,negative_promptNone,prompt_embedsNone,negative_prompt_embedsNone,):if height % 8 ! 0 or width % 8 ! 0:raise ValueError(fheight and width have to be divisible by 8 but are {height} and {width}.)if (callback_steps is None) or (callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps 0)):raise ValueError(fcallback_steps has to be a positive integer but is {callback_steps} of typef {type(callback_steps)}.)if prompt is not None and prompt_embeds is not None:raise ValueError(fCannot forward both prompt: {prompt} and prompt_embeds: {prompt_embeds}. Please make sure to only forward one of the two.)elif prompt is None and prompt_embeds is None:raise ValueError(Provide either prompt or prompt_embeds. Cannot leave both prompt and prompt_embeds undefined.)elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):raise ValueError(fprompt has to be of type str or list but is {type(prompt)})if negative_prompt is not None and negative_prompt_embeds is not None:raise ValueError(fCannot forward both negative_prompt: {negative_prompt} and negative_prompt_embeds:f {negative_prompt_embeds}. Please make sure to only forward one of the two.)if prompt_embeds is not None and negative_prompt_embeds is not None:if prompt_embeds.shape ! negative_prompt_embeds.shape:raise ValueError(prompt_embeds and negative_prompt_embeds must have the same shape when passed directly, butf got: prompt_embeds {prompt_embeds.shape} ! negative_prompt_embedsf {negative_prompt_embeds.shape}.)def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latentsNone):shape (batch_size, num_channels_latents, height // self.vae_scale_factor, width // self.vae_scale_factor)if isinstance(generator, list) and len(generator) ! batch_size:raise ValueError(fYou have passed a list of generators of length {len(generator)}, but requested an effective batchf size of {batch_size}. Make sure the batch size matches the length of the generators.)if latents is None:latents randn_tensor(shape, generatorgenerator, devicedevice, dtypedtype)else:latents latents.to(device)# scale the initial noise by the standard deviation required by the schedulerlatents latents * self.scheduler.init_noise_sigmareturn latentsdef controlnet_hint_conversion(self, controlnet_hint, height, width, num_images_per_prompt):channels 3if isinstance(controlnet_hint, torch.Tensor):# torch.Tensor: acceptble shape are any of chw, bchw(b1) or bchw(bnum_images_per_prompt)shape_chw (channels, height, width)shape_bchw (1, channels, height, width)shape_nchw (num_images_per_prompt, channels, height, width)if controlnet_hint.shape in [shape_chw, shape_bchw, shape_nchw]:controlnet_hint controlnet_hint.to(dtypeself.controlnet.dtype, deviceself.controlnet.device)if controlnet_hint.shape ! shape_nchw:controlnet_hint controlnet_hint.repeat(num_images_per_prompt, 1, 1, 1)return controlnet_hintelse:raise ValueError(fAcceptble shape of controlnet_hint are any of ({channels}, {height}, {width}), f (1, {channels}, {height}, {width}) or ({num_images_per_prompt}, f{channels}, {height}, {width}) but is {controlnet_hint.shape})elif isinstance(controlnet_hint, np.ndarray):# np.ndarray: acceptable shape is any of hw, hwc, bhwc(b1) or bhwc(bnum_images_per_promot)# hwc is opencv compatible image format. Color channel must be BGR Format.if controlnet_hint.shape (height, width):controlnet_hint np.repeat(controlnet_hint[:, :, np.newaxis], channels, axis2) # hw - hwc(c3)shape_hwc (height, width, channels)shape_bhwc (1, height, width, channels)shape_nhwc (num_images_per_prompt, height, width, channels)if controlnet_hint.shape in [shape_hwc, shape_bhwc, shape_nhwc]:controlnet_hint torch.from_numpy(controlnet_hint.copy())controlnet_hint controlnet_hint.to(dtypeself.controlnet.dtype, deviceself.controlnet.device)controlnet_hint / 255.0if controlnet_hint.shape ! shape_nhwc:controlnet_hint controlnet_hint.repeat(num_images_per_prompt, 1, 1, 1)controlnet_hint controlnet_hint.permute(0, 3, 1, 2) # b h w c - b c h wreturn controlnet_hintelse:raise ValueError(fAcceptble shape of controlnet_hint are any of ({width}, {channels}), f({height}, {width}, {channels}), f(1, {height}, {width}, {channels}) or f({num_images_per_prompt}, {channels}, {height}, {width}) but is {controlnet_hint.shape})elif isinstance(controlnet_hint, PIL.Image.Image):if controlnet_hint.size (width, height):controlnet_hint controlnet_hint.convert(RGB) # make sure 3 channel RGB formatcontrolnet_hint np.array(controlnet_hint) # to numpycontrolnet_hint controlnet_hint[:, :, ::-1] # RGB - BGRreturn self.controlnet_hint_conversion(controlnet_hint, height, width, num_images_per_prompt)else:raise ValueError(fAcceptable image size of controlnet_hint is ({width}, {height}) but is {controlnet_hint.size})else:raise ValueError(fAcceptable type of controlnet_hint are any of torch.Tensor, np.ndarray, PIL.Image.Image but is {type(controlnet_hint)})def prepare_mask_latents(self, mask, masked_image, batch_size, height, width, dtype, device, generator, do_classifier_free_guidance):# resize the mask to latents shape as we concatenate the mask to the latents# we do that before converting to dtype to avoid breaking in case were using cpu_offload# and half precisionmask torch.nn.functional.interpolate(mask, size(height // self.vae_scale_factor, width // self.vae_scale_factor))mask mask.to(devicedevice, dtypedtype)masked_image masked_image.to(devicedevice, dtypedtype)# encode the mask image into latents space so we can concatenate it to the latentsif isinstance(generator, list):masked_image_latents [self.vae.encode(masked_image[i : i 1]).latent_dist.sample(generatorgenerator[i])for i in range(batch_size)]masked_image_latents torch.cat(masked_image_latents, dim0)else:masked_image_latents self.vae.encode(masked_image).latent_dist.sample(generatorgenerator)masked_image_latents self.vae.config.scaling_factor * masked_image_latents# duplicate mask and masked_image_latents for each generation per prompt, using mps friendly methodif mask.shape[0] batch_size:if not batch_size % mask.shape[0] 0:raise ValueError(The passed mask and the required batch size dont match. Masks are supposed to be duplicated tof a total batch size of {batch_size}, but {mask.shape[0]} masks were passed. Make sure the number of masks that you pass is divisible by the total requested batch size.)mask mask.repeat(batch_size // mask.shape[0], 1, 1, 1)if masked_image_latents.shape[0] batch_size:if not batch_size % masked_image_latents.shape[0] 0:raise ValueError(The passed images and the required batch size dont match. Images are supposed to be duplicatedf to a total batch size of {batch_size}, but {masked_image_latents.shape[0]} images were passed. Make sure the number of images that you pass is divisible by the total requested batch size.)masked_image_latents masked_image_latents.repeat(batch_size // masked_image_latents.shape[0], 1, 1, 1)mask torch.cat([mask] * 2) if do_classifier_free_guidance else maskmasked_image_latents (torch.cat([masked_image_latents] * 2) if do_classifier_free_guidance else masked_image_latents)# aligning device to prevent device errors when concating it with the latent model inputmasked_image_latents masked_image_latents.to(devicedevice, dtypedtype)return mask, masked_image_latentstorch.no_grad()replace_example_docstring(EXAMPLE_DOC_STRING)def __call__(self,prompt: Union[str, List[str]] None,height: Optional[int] None,width: Optional[int] None,num_inference_steps: int 50,guidance_scale: float 7.5,negative_prompt: Optional[Union[str, List[str]]] None,num_images_per_prompt: Optional[int] 1,eta: float 0.0,generator: Optional[Union[torch.Generator, List[torch.Generator]]] None,latents: Optional[torch.FloatTensor] None,prompt_embeds: Optional[torch.FloatTensor] None,negative_prompt_embeds: Optional[torch.FloatTensor] None,output_type: Optional[str] pil,return_dict: bool True,callback: Optional[Callable[[int, int, torch.FloatTensor], None]] None,callback_steps: Optional[int] 1,cross_attention_kwargs: Optional[Dict[str, Any]] None,controlnet_hint: Optional[Union[torch.FloatTensor, np.ndarray, PIL.Image.Image]] None,image: Union[torch.FloatTensor, PIL.Image.Image] None,mask_image: Union[torch.FloatTensor, PIL.Image.Image] None,):rFunction invoked when calling the pipeline for generation.Args:prompt (str or List[str], *optional*):The prompt or prompts to guide the image generation. If not defined, one has to pass prompt_embeds.instead.height (int, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The height in pixels of the generated image.width (int, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The width in pixels of the generated image.num_inference_steps (int, *optional*, defaults to 50):The number of denoising steps. More denoising steps usually lead to a higher quality image at theexpense of slower inference.guidance_scale (float, *optional*, defaults to 7.5):Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598).guidance_scale is defined as w of equation 2. of [ImagenPaper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting guidance_scale 1. Higher guidance scale encourages to generate images that are closely linked to the text prompt,usually at the expense of lower image quality.negative_prompt (str or List[str], *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to passnegative_prompt_embeds. instead. If not defined, one has to pass negative_prompt_embeds. instead.Ignored when not using guidance (i.e., ignored if guidance_scale is less than 1).num_images_per_prompt (int, *optional*, defaults to 1):The number of images to generate per prompt.eta (float, *optional*, defaults to 0.0):Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to[schedulers.DDIMScheduler], will be ignored for others.generator (torch.Generator or List[torch.Generator], *optional*):One or a list of [torch generator(s)](https://pytorch.org/docs/stable/generated/torch.Generator.html)to make generation deterministic.latents (torch.FloatTensor, *optional*):Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for imagegeneration. Can be used to tweak the same generation with different prompts. If not provided, a latentstensor will ge generated by sampling using the supplied random generator.prompt_embeds (torch.FloatTensor, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from prompt input argument.negative_prompt_embeds (torch.FloatTensor, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from negative_prompt inputargument.output_type (str, *optional*, defaults to pil):The output format of the generate image. Choose between[PIL](https://pillow.readthedocs.io/en/stable/): PIL.Image.Image or np.array.return_dict (bool, *optional*, defaults to True):Whether or not to return a [~pipelines.stable_diffusion.StableDiffusionPipelineOutput] instead of aplain tuple.callback (Callable, *optional*):A function that will be called every callback_steps steps during inference. The function will becalled with the following arguments: callback(step: int, timestep: int, latents: torch.FloatTensor).callback_steps (int, *optional*, defaults to 1):The frequency at which the callback function will be called. If not specified, the callback will becalled at every step.cross_attention_kwargs (dict, *optional*):A kwargs dictionary that if specified is passed along to the AttnProcessor as defined underself.processor in[diffusers.cross_attention](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/cross_attention.py).controlnet_hint (torch.FloatTensor, np.ndarray or PIL.Image.Image, *optional*):ControlNet input embedding. ControlNet generates guidances using this input embedding. If the type isspecified as torch.FloatTensor, it is passed to ControlNet as is. If the type is np.ndarray, it isassumed to be an OpenCV compatible image format. PIL.Image.Image can also be accepted as an image. Thesize of all these types must correspond to the output image size.Examples:Returns:[~pipelines.stable_diffusion.StableDiffusionPipelineOutput] or tuple:[~pipelines.stable_diffusion.StableDiffusionPipelineOutput] if return_dict is True, otherwise a tuple.When returning a tuple, the first element is a list with the generated images, and the second element is alist of bools denoting whether the corresponding generated image likely represents not-safe-for-work(nsfw) content, according to the safety_checker.# 0. Default height and width to unetheight height or self.unet.config.sample_size * self.vae_scale_factorwidth width or self.unet.config.sample_size * self.vae_scale_factor# 1. Control Embedding check conversionif controlnet_hint is not None:controlnet_hint self.controlnet_hint_conversion(controlnet_hint, height, width, num_images_per_prompt)# 2. Check inputs. Raise error if not correctself.check_inputs(prompt, height, width, callback_steps, negative_prompt, prompt_embeds, negative_prompt_embeds)# 3. Define call parametersif prompt is not None and isinstance(prompt, str):batch_size 1elif prompt is not None and isinstance(prompt, list):batch_size len(prompt)else:batch_size prompt_embeds.shape[0]device self._execution_device# here guidance_scale is defined analog to the guidance weight w of equation (2)# of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . guidance_scale 1# corresponds to doing no classifier free guidance.do_classifier_free_guidance guidance_scale 1.0# 4. Encode input promptprompt_embeds self._encode_prompt(prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt,prompt_embedsprompt_embeds,negative_prompt_embedsnegative_prompt_embeds,)mask, masked_image prepare_mask_and_masked_image(image, mask_image)# 5. Prepare timestepsself.scheduler.set_timesteps(num_inference_steps, devicedevice)timesteps self.scheduler.timesteps# 6. Prepare latent variablesnum_channels_latents self.unet.in_channelslatents self.prepare_latents(batch_size * num_images_per_prompt,num_channels_latents,height,width,prompt_embeds.dtype,device,generator,latents,)mask, masked_image_latents self.prepare_mask_latents(mask,masked_image,batch_size * num_images_per_prompt,height,width,prompt_embeds.dtype,device,generator,do_classifier_free_guidance,)num_channels_mask mask.shape[1]num_channels_masked_image masked_image_latents.shape[1]# 7. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipelineextra_step_kwargs self.prepare_extra_step_kwargs(generator, eta)# 8. Denoising loopnum_warmup_steps len(timesteps) - num_inference_steps * self.scheduler.orderwith self.progress_bar(totalnum_inference_steps) as progress_bar:for i, t in enumerate(timesteps):# expand the latents if we are doing classifier free guidancelatent_model_input torch.cat([latents] * 2) if do_classifier_free_guidance else latentslatent_model_input self.scheduler.scale_model_input(latent_model_input, t)if controlnet_hint is not None:# ControlNet predict the noise residualcontrol self.controlnet(latent_model_input, t, encoder_hidden_statesprompt_embeds, controlnet_hintcontrolnet_hint)control [item for item in control]latent_model_input torch.cat([latent_model_input, mask, masked_image_latents], dim1)noise_pred self.unet(latent_model_input,t,encoder_hidden_statesprompt_embeds,cross_attention_kwargscross_attention_kwargs,controlcontrol,).sampleelse:# predict the noise residuallatent_model_input torch.cat([latent_model_input, mask, masked_image_latents], dim1)noise_pred self.unet(latent_model_input,t,encoder_hidden_statesprompt_embeds,cross_attention_kwargscross_attention_kwargs,).sample# perform guidanceif do_classifier_free_guidance:noise_pred_uncond, noise_pred_text noise_pred.chunk(2)noise_pred noise_pred_uncond guidance_scale * (noise_pred_text - noise_pred_uncond)# compute the previous noisy sample x_t - x_t-1latents self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample# call the callback, if providedif i len(timesteps) - 1 or ((i 1) num_warmup_steps and (i 1) % self.scheduler.order 0):progress_bar.update()if callback is not None and i % callback_steps 0:callback(i, t, latents)if output_type latent:image latentshas_nsfw_concept Noneelif output_type pil:# 8. Post-processingimage self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept self.run_safety_checker(image, device, prompt_embeds.dtype)# 10. Convert to PILimage self.numpy_to_pil(image)else:# 8. Post-processingimage self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept self.run_safety_checker(image, device, prompt_embeds.dtype)if not return_dict:return (image, has_nsfw_concept)return StableDiffusionPipelineOutput(imagesimage, nsfw_content_detectedhas_nsfw_concept)
把文件放置在以下项目下的每个__init__.py需要把新加的类添加进去添加信息可以参考文件里面的写法一般如下