이전 포스트에서 애저 Durable Functions를 통해 워크플로우 오케스트레이션에 대해 다뤄보았다. 이런 오케스트레이션은 이 Durable Functions의 Stateful한 특성 때문인데, 이 특성을 이용하면 훨씬 더 다양한 용도로 사용할 수 있다. 이번 포스트에서는 바로 이 Durable Functions의 Stateful 속성을 이용해서 RequestBin 애플리케이션 기능을 구현해 보기로 한다.
이 포스트에 쓰인 샘플 코드는 Durable RequestBin Sample에서 다운로드 받을 수 있다.
웹훅을 테스트 하기 위해 사용했던 추억의 RequestBin 앱 첫화면을 다들 기억할 것이다.
현재는 정식으로 서비스 되고 있지 않고 다만 소스 코드와 함께 샘플 형태로 운영되고 있다. 따라서, 내가 직접 이 서비스를 운영하고 싶다면 도커 컨테이너를 이용해서 어딘가에 호스팅을 해야 한다. 이와 관련해서 애저 컨테이너 인스턴스를 활용해서 올리는 방법과 애저 앱 서비스를 활용해서 올리는 방법을 예전에 소개한 적이 있다. 오리지널 RequestBin 앱은 애플리케이션과 Redis 캐시로 구성이 되어 있는데, 캐시의 특성상 언제든 데이터가 소실될 수 있기 때문에 가끔은 오래된 웹훅 히스토리를 확인하고 싶을 때에는 난감할 수 있다.
마침 애저 Durable Functions는 자체적으로 테이블 저장소 기능을 이용해서 이벤트 소싱 패턴을 구현해 놓았다. 또한 이를 통해 데이터를 Stateful하게 저장할 수 있기 때문에 코드와 데이터를 동시에 다뤄야 하는 RequestBin 애플리케이션을 처음부터 만들어 보기에 아주 적절한 예시가 될 수 있다.
상태 저장 엔티티
Durable Functions의 오케스트레이션 기능은 IDurableOrchestrationClient
를 통해 State (상태)를 암시적으로 저장하는 반면에, 상태 저장 엔티티를 사용하면 IDurableClient
를 통해 이 State를 명시적으로 저장하고 호출한다. 따라서, 대략 아래와 같은 모양이 될 것이다. 오케스트레이션을 구현하는 대신 State에 직접 접근하는 IDurableClient
인스턴스가 보일 것이다 (line #4).
[FunctionName(nameof(CreateBin))] | |
public async Task<IActionResult> CreateBin( | |
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route="bins")] HttpRequest req, | |
[DurableClient] IDurableClient client) | |
{ | |
... | |
} |
이제 이를 바탕으로 상태를 정의하는 엔티티를 생성한다. binId
는 유일한 값이라면 뭐가 되든 상관 없다. 여기서는 GUID를 사용한다 (line #6). EntityId
가 바로 상태를 관장하는 값이다 (line #7).
[FunctionName(nameof(CreateBin))] | |
public async Task<IActionResult> CreateBin( | |
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route="bins")] HttpRequest req, | |
[DurableClient] IDurableClient client) | |
{ | |
var binId = Guid.NewGuid(); | |
var bin = new EntityId("Bin", binId.ToString()); | |
... | |
} |
위 코드에서 보면 "Bin"
이라는 값은 이 상태를 명시적으로 저장하고 삭제하는 엔티티의 이름이다. 이 엔티티는 액터 모델의 구현을 따라간다. 엔티티의 상태와 더불어 어떤 식으로 엔티티의 상태를 변경시킬 수 있는지에 대한 액션도 정의되어 있다. 대략 아래와 같은 모습이다. 먼저 IBin
인터페이스를 통해 상태 변경과 관련한 액션을 정의한다. 여기서는 상태를 추가하고 리셋하는 역할만 한다.
public interface IBin | |
{ | |
void Add(BinItem item); | |
void Reset(); | |
} |
그리고 아래와 같이 Bin
클라스로 인터페이스 구현을 하는데, 상태 저장을 위한 History
라는 속성을 정의한다 (line #5). 이 때 클라스 데코레이터로 직렬화 옵션을 MemberSerialization.OptIn
라고 주면 (line #1) 명시적으로 JsonProperty
데코레이터를 선언한 속성에 대해서만 직렬화를 시도한다 (line #4). 마지막 줄에 보면 Run()
라는 이름의 정적 메소드가 있는데 (line #23), 이를 통해 이벤트를 발생시켜 테이블 저장소에 상태를 저장하게 된다.
[JsonObject(MemberSerialization = MemberSerialization.OptIn)] | |
public class Bin : IBin | |
{ | |
[JsonProperty("history")] | |
public virtual List<BinItem> History { get; set; } = new List<BinItem>(); | |
public void Add(BinItem item) | |
{ | |
if (item == null) | |
{ | |
return; | |
} | |
this.History.Insert(0, item); | |
} | |
public void Reset() | |
{ | |
this.History.Clear(); | |
} | |
[FunctionName(nameof(Bin))] | |
public static Task Run([EntityTrigger] IDurableEntityContext ctx) => ctx.DispatchAsync<Bin>(); | |
} |
Bin 생성
그렇다면, 이제 이 엔티티에 어떻게 상태를 저장할까? SignalEntityAsync()
메소드를 통해 엔티티에 구현한 메소드를 호출한다 (line #8). 여기서는 비어있는 Bin 객체만 반환시킬 예정이므로 null
값을 보내게 된다. 이렇게 해서 비어있는 Bin이 하나 만들어졌다.
[FunctionName(nameof(CreateBin))] | |
public async Task<IActionResult> CreateBin( | |
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route="bins")] HttpRequest req, | |
[DurableClient] IDurableClient client) | |
{ | |
... | |
await client.SignalEntityAsync<IBin>(bin, o => o.Add(null)); | |
... | |
} |
여기까지 해서 펑션 앱을 실제로 돌려보면 테이블 저장소에 아래와 같은 형태로 레코드가 생성된 것이 보일 것이다. history
필드에 비어있는 배열만 보이는가? 현재 Bin
만 만들어졌기 때문이다.
웹훅 요청 저장
이제 웹훅 요청 히스토리를 하나씩 저장시켜 보자. 앞서 만든 엔드포인트와 거의 비슷하다. 다만 이번에는 요청 데이터를 넣어줘야 한다. 타임스탬프, 요청 메소드, 헤더, 쿼리스트링, 페이로드를 모두 캡쳐해서 저장한다 (line #10-14).
[FunctionName(nameof(AddHistory))] | |
public async Task<IActionResult> AddHistory( | |
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", "put", "patch", "delete", Route="bins/{binId}")] HttpRequest req, | |
[DurableClient] IDurableClient client, | |
string binId) | |
{ | |
var history = new BinItem(); | |
using (var reader = new StreamReader(req.Body)) | |
{ | |
history.Timestamp = DateTimeOffset.UtcNow.ToString("yyyy-MM-ddTHH:mm:ss.fffzzz"); | |
history.Method = req.Method; | |
history.Headers = req.Headers.AsEnumerable().ToDictionary(p => p.Key, p => string.Join(";", p.Value)); | |
history.Queries = req.Query.ToDictionary(p => p.Key, p => string.Join(";", p.Value)); | |
history.Body = await reader.ReadToEndAsync(); | |
} | |
... | |
} |
그리고, 앞서와 같이 bin
을 만들어 SignalEntityAsync()
메소드를 통해 히스토리를 추가한다 (line #11).
[FunctionName(nameof(AddHistory))] | |
public async Task<IActionResult> AddHistory( | |
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", "put", "patch", "delete", Route="bins/{binId}")] HttpRequest req, | |
[DurableClient] IDurableClient client, | |
string binId) | |
{ | |
... | |
var bin = new EntityId("Bin", binId); | |
await client.SignalEntityAsync<IBin>(bin, o => o.Add(history)); | |
... | |
} |
이렇게 한 후 실제로 웹훅 요청을 날려보면 테이블 저장소의 데이터가 아래와 같이 변경된 것이 보인다. 실제로 요청 데이터가 저장된 것이다.
웹훅 히스토리 조회
그렇다면, 지금까지 저장해 놓은 웹훅 히스토리를 열어봐야 할 필요도 있을 것이다. 이 경우는 아래와 같이 먼저 Bin 레퍼런스를 생성한다 (line #7).
[FunctionName(nameof(GetHistory))] | |
public async Task<IActionResult> GetHistory( | |
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route="bins/{binId}/history")] HttpRequest req, | |
[DurableClient] IDurableClient client, | |
string binId) | |
{ | |
var bin = new EntityId("Bin", binId); | |
... | |
} |
그리고 난 뒤, ReadEntityStateAsync()
메소드를 통해 현재 상태를 가져와서 응답 객체로 반환한다 (line #9).
[FunctionName(nameof(GetHistory))] | |
public async Task<IActionResult> GetHistory( | |
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route="bins/{binId}/history")] HttpRequest req, | |
[DurableClient] IDurableClient client, | |
string binId) | |
{ | |
... | |
var entity = await client.ReadEntityStateAsync<Bin>(bin); | |
var payload = entity.EntityState; | |
var result = new JsonObjectContentResult(HttpStatusCode.OK, payload); | |
return result; | |
} |
이렇게 하면 아래와 같이 저장된 웹훅 요청 데이터에 대한 히스토리를 볼 수 있다.
웹훅 히스토리 삭제
이번에는 Bin 안에 저장된 모든 히스토리를 삭제해보자. 먼저 Bin 레퍼런스를 생성한다 (line #7).
[FunctionName(nameof(ResetHistory))] | |
public async Task<IActionResult> ResetHistory( | |
[HttpTrigger(AuthorizationLevel.Anonymous, "delete", Route="bins/{binId}/reset")] HttpRequest req, | |
[DurableClient] IDurableClient client, | |
string binId) | |
{ | |
var bin = new EntityId("Bin", binId); | |
... | |
} |
그리고 난 후 이번에는 SignalEntityAsync()
메소드를 통해 Bin
액터의 Reset()
메소드를 호출한다 (line #9).
[FunctionName(nameof(ResetHistory))] | |
public async Task<IActionResult> ResetHistory( | |
[HttpTrigger(AuthorizationLevel.Anonymous, "delete", Route="bins/{binId}/reset")] HttpRequest req, | |
[DurableClient] IDurableClient client, | |
string binId) | |
{ | |
... | |
await client.SignalEntityAsync<IBin>(bin, o => o.Reset()); | |
... | |
} |
이후 테이블 저장소를 조회해 보면 모든 웹훅 히스토리가 사라진 것이 보인다.
Bin 삭제
이제 마지막으로 이 Bin이 더이상 필요없을 때 삭제하는 엔드포인트를 만들어 보자. 먼저 Bin 레퍼런스를 생성한다 (line #7).
[FunctionName(nameof(PurgeBin))] | |
public async Task<IActionResult> PurgeBin( | |
[HttpTrigger(AuthorizationLevel.Anonymous, "delete", Route="bins/{binId}/purge")] HttpRequest req, | |
[DurableClient] IDurableClient client, | |
string binId) | |
{ | |
var bin = new EntityId("Bin", binId); | |
... | |
} |
그리고, PurgeInstanceHistoryAsync()
메소드를 통해 엔티티 자체를 테이블 저장소에서 삭제한다 (line #9).
[FunctionName(nameof(PurgeBin))] | |
public async Task<IActionResult> PurgeBin( | |
[HttpTrigger(AuthorizationLevel.Anonymous, "delete", Route="bins/{binId}/purge")] HttpRequest req, | |
[DurableClient] IDurableClient client, | |
string binId) | |
{ | |
... | |
await client.PurgeInstanceHistoryAsync($"@{bin.EntityName}@{bin.EntityKey}"); | |
var result = new NoContentResult(); | |
return result; | |
} |
실제로 이 엔드포인트를 호출하면 아래와 같이 테이블 저장소에서 엔티티가 완전히 사라진 것을 확인할 수 있다.
이렇게 RequestBin 앱을 구현해 봤다. 이 코드만 가지고서는 간단한 웹훅 확인 용도로 사용하는데에는 큰 문제가 없다. 여기에 더해 조금 더 UI를 붙여준다거나 하면 좀 더 완성도가 높은 앱이 될 것이다. 이 실습의 포인트는 Durable Functions의 Stateful한 특성을 오케스트레이션 용도 뿐만 아니라 직접 액세스를 통해 다양한 활용도를 실험해 볼 수 있다는 데 있다. 앞으로 이 Durable Functions을 통해 좀 더 다양한 워크플로우 관리를 할 수 있기를 기대한다.